use crate::snowflake::Snowflake;
use crate::types::EntityAddress;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Envelope {
Request(EnvelopeRequest),
AckChunk(AckChunk),
Interrupt(Interrupt),
}
pub const STREAM_HEADER_KEY: &str = "x-cruster-stream";
pub const STREAM_HEADER_VALUE: &str = "1";
pub const REQUEST_ID_HEADER_KEY: &str = "x-cruster-request-id";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvelopeRequest {
pub request_id: Snowflake,
pub address: EntityAddress,
pub tag: String,
pub payload: Vec<u8>,
pub headers: HashMap<String, String>,
pub span_id: Option<String>,
pub trace_id: Option<String>,
pub sampled: Option<bool>,
#[serde(default)]
pub persisted: bool,
#[serde(default)]
pub uninterruptible: crate::schema::Uninterruptible,
#[serde(default)]
pub deliver_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AckChunk {
pub request_id: Snowflake,
pub id: Snowflake,
pub sequence: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Interrupt {
pub request_id: Snowflake,
pub address: EntityAddress,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{EntityId, EntityType, ShardId};
fn sample_request() -> EnvelopeRequest {
EnvelopeRequest {
request_id: Snowflake(1000),
address: EntityAddress {
shard_id: ShardId::new("default", 1),
entity_type: EntityType::new("User"),
entity_id: EntityId::new("u-1"),
},
tag: "getProfile".into(),
payload: vec![1, 2, 3],
headers: HashMap::from([("x-trace".into(), "abc".into())]),
span_id: Some("span-1".into()),
trace_id: Some("trace-1".into()),
sampled: Some(true),
persisted: false,
uninterruptible: Default::default(),
deliver_at: None,
}
}
#[test]
fn envelope_request_serde_roundtrip() {
let req = sample_request();
let bytes = rmp_serde::to_vec(&Envelope::Request(req.clone())).unwrap();
let decoded: Envelope = rmp_serde::from_slice(&bytes).unwrap();
match decoded {
Envelope::Request(r) => {
assert_eq!(r.request_id, req.request_id);
assert_eq!(r.tag, req.tag);
assert_eq!(r.payload, req.payload);
}
_ => panic!("expected Request variant"),
}
}
#[test]
fn envelope_request_preserves_uninterruptible() {
let mut req = sample_request();
req.persisted = true;
req.uninterruptible = crate::schema::Uninterruptible::Server;
let bytes = rmp_serde::to_vec(&Envelope::Request(req.clone())).unwrap();
let decoded: Envelope = rmp_serde::from_slice(&bytes).unwrap();
match decoded {
Envelope::Request(r) => {
assert!(r.persisted);
assert_eq!(r.uninterruptible, crate::schema::Uninterruptible::Server);
}
_ => panic!("expected Request variant"),
}
}
#[test]
fn envelope_request_preserves_deliver_at() {
let mut req = sample_request();
let deliver_time = chrono::Utc::now() + chrono::Duration::hours(1);
req.deliver_at = Some(deliver_time);
let bytes = rmp_serde::to_vec(&Envelope::Request(req)).unwrap();
let decoded: Envelope = rmp_serde::from_slice(&bytes).unwrap();
match decoded {
Envelope::Request(r) => {
assert_eq!(r.deliver_at, Some(deliver_time));
}
_ => panic!("expected Request variant"),
}
}
#[test]
fn envelope_request_deliver_at_defaults_to_none() {
let req = sample_request();
assert_eq!(req.deliver_at, None);
}
#[test]
fn ack_chunk_serde_roundtrip() {
let ack = AckChunk {
request_id: Snowflake(100),
id: Snowflake(200),
sequence: 5,
};
let bytes = rmp_serde::to_vec(&Envelope::AckChunk(ack.clone())).unwrap();
let decoded: Envelope = rmp_serde::from_slice(&bytes).unwrap();
match decoded {
Envelope::AckChunk(a) => {
assert_eq!(a.request_id, ack.request_id);
assert_eq!(a.sequence, ack.sequence);
}
_ => panic!("expected AckChunk variant"),
}
}
#[test]
fn interrupt_serde_roundtrip() {
let int = Interrupt {
request_id: Snowflake(300),
address: EntityAddress {
shard_id: ShardId::new("default", 2),
entity_type: EntityType::new("Order"),
entity_id: EntityId::new("o-1"),
},
};
let bytes = rmp_serde::to_vec(&Envelope::Interrupt(int.clone())).unwrap();
let decoded: Envelope = rmp_serde::from_slice(&bytes).unwrap();
match decoded {
Envelope::Interrupt(i) => {
assert_eq!(i.request_id, int.request_id);
assert_eq!(i.address, int.address);
}
_ => panic!("expected Interrupt variant"),
}
}
}