use std::fmt::Debug;
use std::{hash::Hash, time::SystemTime};
use crate::{concurrency::Duration, Message};
#[cfg(feature = "cluster")]
use crate::{message::BoxedDowncastErr, BytesConvertable};
#[cfg(feature = "cluster")]
pub trait JobKey:
Debug + Hash + Send + Sync + Clone + Eq + PartialEq + BytesConvertable + 'static
{
}
#[cfg(feature = "cluster")]
impl<T: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + BytesConvertable + 'static> JobKey
for T
{
}
#[cfg(not(feature = "cluster"))]
pub trait JobKey: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + 'static {}
#[cfg(not(feature = "cluster"))]
impl<T: Debug + Hash + Send + Sync + Clone + Eq + PartialEq + 'static> JobKey for T {}
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct JobOptions {
pub submit_time: SystemTime,
pub factory_time: SystemTime,
pub worker_time: SystemTime,
pub ttl: Option<Duration>,
}
impl Default for JobOptions {
fn default() -> Self {
Self {
submit_time: SystemTime::now(),
factory_time: SystemTime::now(),
worker_time: SystemTime::now(),
ttl: None,
}
}
}
#[cfg(feature = "cluster")]
impl BytesConvertable for JobOptions {
fn into_bytes(self) -> Vec<u8> {
let submit_time = (self
.submit_time
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_nanos() as u64)
.to_be_bytes();
let ttl = self
.ttl
.map(|t| t.as_nanos() as u64)
.unwrap_or(0)
.to_be_bytes();
let mut data = vec![0u8; 16];
data[0..8].copy_from_slice(&submit_time);
data[8..16].copy_from_slice(&ttl);
data
}
fn from_bytes(mut data: Vec<u8>) -> Self {
if data.len() != 16 {
Self::default()
} else {
let ttl_bytes = data.split_off(8);
let submit_time = <u64 as BytesConvertable>::from_bytes(data);
let ttl = <u64 as BytesConvertable>::from_bytes(ttl_bytes);
Self {
submit_time: std::time::UNIX_EPOCH + Duration::from_nanos(submit_time),
ttl: if ttl > 0 {
Some(Duration::from_nanos(ttl))
} else {
None
},
..Default::default()
}
}
}
}
pub struct Job<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
pub key: TKey,
pub msg: TMsg,
pub options: JobOptions,
}
#[cfg(feature = "cluster")]
impl<TKey, TMsg> Job<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn serialize_meta(self) -> (Vec<u8>, TMsg) {
let options_bytes = self.options.into_bytes();
let key_bytes = self.key.into_bytes();
let mut meta = vec![0u8; 16 + key_bytes.len()];
meta[0..16].copy_from_slice(&options_bytes);
meta[16..].copy_from_slice(&key_bytes);
(meta, self.msg)
}
fn deserialize_meta(
maybe_bytes: Option<Vec<u8>>,
) -> Result<(TKey, JobOptions), BoxedDowncastErr> {
if let Some(mut meta_bytes) = maybe_bytes {
let key_bytes = meta_bytes.split_off(16);
Ok((
TKey::from_bytes(key_bytes),
JobOptions::from_bytes(meta_bytes),
))
} else {
Err(BoxedDowncastErr)
}
}
}
#[cfg(feature = "cluster")]
impl<TKey, TMsg> Message for Job<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn serializable() -> bool {
TMsg::serializable()
}
fn serialize(self) -> Result<crate::message::SerializedMessage, BoxedDowncastErr> {
let (meta_bytes, msg) = self.serialize_meta();
let inner_message = msg.serialize()?;
match inner_message {
crate::message::SerializedMessage::CallReply(_, _) => Err(BoxedDowncastErr),
crate::message::SerializedMessage::Call {
variant,
args,
reply,
..
} => Ok(crate::message::SerializedMessage::Call {
variant,
args,
reply,
metadata: Some(meta_bytes),
}),
crate::message::SerializedMessage::Cast { variant, args, .. } => {
Ok(crate::message::SerializedMessage::Cast {
variant,
args,
metadata: Some(meta_bytes),
})
}
}
}
fn deserialize(bytes: crate::message::SerializedMessage) -> Result<Self, BoxedDowncastErr> {
match bytes {
crate::message::SerializedMessage::CallReply(_, _) => Err(BoxedDowncastErr),
crate::message::SerializedMessage::Cast {
variant,
args,
metadata,
} => {
let (key, options) = Self::deserialize_meta(metadata)?;
let msg = TMsg::deserialize(crate::message::SerializedMessage::Cast {
variant,
args,
metadata: None,
})?;
Ok(Self { msg, key, options })
}
crate::message::SerializedMessage::Call {
variant,
args,
reply,
metadata,
} => {
let (key, options) = Self::deserialize_meta(metadata)?;
let msg = TMsg::deserialize(crate::message::SerializedMessage::Call {
variant,
args,
reply,
metadata: None,
})?;
Ok(Self { msg, key, options })
}
}
}
}
impl<TKey, TMsg> Job<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
pub(crate) fn is_expired(&self) -> bool {
if let Some(ttl) = self.options.ttl {
self.options.submit_time.elapsed().unwrap() > ttl
} else {
false
}
}
pub(crate) fn set_factory_time(&mut self) {
self.options.factory_time = SystemTime::now();
}
pub(crate) fn set_worker_time(&mut self) {
self.options.worker_time = SystemTime::now();
}
}
#[cfg(feature = "cluster")]
#[cfg(test)]
mod tests {
use super::super::FactoryMessage;
use super::Job;
use crate::{
concurrency::Duration, factory::JobOptions, serialization::BytesConvertable, Message,
};
use crate::{message::SerializedMessage, RpcReplyPort};
#[derive(Eq, Hash, PartialEq, Clone, Debug)]
struct TestKey {
item: u64,
}
impl crate::BytesConvertable for TestKey {
fn from_bytes(bytes: Vec<u8>) -> Self {
Self {
item: u64::from_bytes(bytes),
}
}
fn into_bytes(self) -> Vec<u8> {
self.item.into_bytes()
}
}
#[derive(Debug)]
enum TestMessage {
#[allow(dead_code)]
A(String),
#[allow(dead_code)]
B(String, RpcReplyPort<u128>),
}
impl crate::Message for TestMessage {
fn serializable() -> bool {
true
}
fn serialize(
self,
) -> Result<crate::message::SerializedMessage, crate::message::BoxedDowncastErr> {
match self {
Self::A(args) => Ok(crate::message::SerializedMessage::Cast {
variant: "A".to_string(),
args: args.into_bytes(),
metadata: None,
}),
Self::B(args, _reply) => {
let (tx, _rx) = crate::concurrency::oneshot();
Ok(crate::message::SerializedMessage::Call {
variant: "B".to_string(),
args: args.into_bytes(),
reply: tx.into(),
metadata: None,
})
}
}
}
fn deserialize(
bytes: crate::message::SerializedMessage,
) -> Result<Self, crate::message::BoxedDowncastErr> {
match bytes {
crate::message::SerializedMessage::Cast { variant, args, .. } => {
match variant.as_str() {
"A" => Ok(Self::A(String::from_bytes(args))),
_ => Err(crate::message::BoxedDowncastErr),
}
}
crate::message::SerializedMessage::Call { variant, args, .. } => {
match variant.as_str() {
"B" => {
let (tx, _rx) = crate::concurrency::oneshot();
Ok(Self::B(String::from_bytes(args), tx.into()))
}
_ => Err(crate::message::BoxedDowncastErr),
}
}
_ => Err(crate::message::BoxedDowncastErr),
}
}
}
type TheJob = Job<TestKey, TestMessage>;
#[test]
#[tracing_test::traced_test]
fn test_job_serialization() {
let job_a = TheJob {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: JobOptions::default(),
};
let expected_a = TheJob {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: job_a.options.clone(),
};
let serialized_a = job_a.serialize().expect("Failed to serialize job A");
let deserialized_a =
TheJob::deserialize(serialized_a).expect("Failed to deserialize job A");
assert_eq!(expected_a.key, deserialized_a.key);
assert_eq!(
expected_a.options.submit_time,
deserialized_a.options.submit_time
);
assert_eq!(expected_a.options.ttl, deserialized_a.options.ttl);
if let TestMessage::A(the_msg) = deserialized_a.msg {
assert_eq!("Hello".to_string(), the_msg);
} else {
panic!("Failed to deserialize the message payload");
}
let job_b = TheJob {
key: TestKey { item: 456 },
msg: TestMessage::B("Hi".to_string(), crate::concurrency::oneshot().0.into()),
options: JobOptions {
ttl: Some(Duration::from_millis(1000)),
..Default::default()
},
};
let expected_b = TheJob {
key: TestKey { item: 456 },
msg: TestMessage::B("Hi".to_string(), crate::concurrency::oneshot().0.into()),
options: job_b.options.clone(),
};
let serialized_b = job_b.serialize().expect("Failed to serialize job B");
let deserialized_b =
TheJob::deserialize(serialized_b).expect("Failed to deserialize job A");
assert_eq!(expected_b.key, deserialized_b.key);
assert_eq!(
expected_b.options.submit_time,
deserialized_b.options.submit_time
);
assert_eq!(expected_b.options.ttl, deserialized_b.options.ttl);
if let TestMessage::B(the_msg, _) = deserialized_b.msg {
assert_eq!("Hi".to_string(), the_msg);
} else {
panic!("Failed to deserialize the message payload");
}
}
#[test]
#[tracing_test::traced_test]
fn test_factory_message_serialization() {
let job_a = TheJob {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: JobOptions::default(),
};
let expected_a = TheJob {
key: TestKey { item: 123 },
msg: TestMessage::A("Hello".to_string()),
options: job_a.options.clone(),
};
let msg = FactoryMessage::Dispatch(job_a);
let serialized_a = msg.serialize().expect("Failed to serialize");
let serialized_a_prime = expected_a.serialize().expect("Failed to serialize");
if let (
SerializedMessage::Cast {
variant: variant1,
args: args1,
metadata: metadata1,
},
SerializedMessage::Cast {
variant: variant2,
args: args2,
metadata: metadata2,
},
) = (serialized_a, serialized_a_prime)
{
assert_eq!(variant1, variant2);
assert_eq!(args1, args2);
assert_eq!(metadata1, metadata2);
} else {
panic!("Non-cast serialization")
}
}
}