use prost::Message;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use super::Outputs;
use crate::types::{LinkMessage, Payload};
fn test_typed_output<T: Send + Sync + Clone + std::fmt::Debug + PartialEq + 'static>(
expected_data: T,
expected_serialized: Vec<u8>,
serializer: impl for<'b, 'a> Fn(&'b mut Vec<u8>, &'a T) -> anyhow::Result<()>
+ Send
+ Sync
+ 'static,
) {
let hlc = uhlc::HLC::default();
let key: Arc<str> = "test".into();
let (tx, rx) = flume::unbounded::<LinkMessage>();
let mut outputs = Outputs {
hmap: HashMap::from([(key.clone(), vec![tx])]),
hlc: Arc::new(hlc),
};
let output = outputs
.take(&key)
.expect("Wrong key provided")
.typed(serializer);
output
.try_send(expected_data.clone(), None)
.expect("Failed to send the message");
let message = rx.recv().expect("Received no message");
match message {
LinkMessage::Data(data) => match &*data {
Payload::Bytes(_) => panic!("Unexpected bytes payload"),
Payload::Typed((dyn_data, serializer)) => {
let mut dyn_serialized = Vec::new();
(serializer)(&mut dyn_serialized, dyn_data.clone()).expect("Failed to serialize");
assert_eq!(expected_serialized, dyn_serialized);
let data = (**dyn_data)
.as_any()
.downcast_ref::<T>()
.expect("Failed to downcast");
assert_eq!(expected_data, *data);
}
},
LinkMessage::Watermark(_) => panic!("Unexpected watermark message"),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestData {
pub field1: u8,
pub field2: String,
pub field3: f64,
}
#[test]
fn test_serde_json() {
let expected_data = TestData {
field1: 1u8,
field2: "two".into(),
field3: 0.3f64,
};
let expected_serialized =
serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialize");
let serializer = |buffer: &mut Vec<u8>, data: &TestData| {
serde_json::ser::to_writer(buffer, data).map_err(|e| anyhow::anyhow!(e))
};
test_typed_output(expected_data, expected_serialized, serializer)
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TestProto {
#[prost(int64, tag = "1")]
pub field1: i64,
#[prost(string, tag = "2")]
pub field2: ::prost::alloc::string::String,
#[prost(double, tag = "3")]
pub field3: f64,
}
#[test]
fn test_protobuf_prost() {
let expected_data = TestProto {
field1: 1i64,
field2: "two".into(),
field3: 0.3f64,
};
let expected_serialized = expected_data.encode_to_vec();
let serializer = |buffer: &mut Vec<u8>, data: &TestProto| {
data.encode(buffer).map_err(|e| anyhow::anyhow!(e))
};
test_typed_output(expected_data, expected_serialized, serializer)
}