use std::collections::HashMap;
use dactor::interceptor::SendMode;
use dactor::node::{ActorId, NodeId};
use dactor::remote::{
receive_envelope_body_versioned, MessageVersionHandler, WireEnvelope,
WireHeaders,
};
use dactor::type_registry::TypeRegistry;
fn make_envelope(message_type: &str, body: Vec<u8>, version: Option<u32>) -> WireEnvelope {
WireEnvelope {
target: ActorId {
node: NodeId("test-node".into()),
local: 1,
},
target_name: "test-actor".into(),
message_type: message_type.into(),
send_mode: SendMode::Tell,
headers: WireHeaders::new(),
body,
request_id: None,
version,
}
}
fn empty_handlers() -> HashMap<String, Box<dyn MessageVersionHandler>> {
HashMap::new()
}
struct PlaceOrderMigrator;
impl MessageVersionHandler for PlaceOrderMigrator {
fn message_type(&self) -> &'static str {
"test::PlaceOrder"
}
fn migrate(&self, payload: &[u8], from_version: u32, to_version: u32) -> Option<Vec<u8>> {
match (from_version, to_version) {
(1, 2) => {
let mut v2 = payload.to_vec();
v2.push(0); Some(v2)
}
(2, 3) => {
Some(payload.to_vec())
}
(1, 3) => {
let v2 = self.migrate(payload, 1, 2)?;
self.migrate(&v2, 2, 3)
}
(2, 1) => {
None
}
_ => None,
}
}
}
fn place_order_handlers() -> HashMap<String, Box<dyn MessageVersionHandler>> {
let mut h: HashMap<String, Box<dyn MessageVersionHandler>> = HashMap::new();
h.insert("test::PlaceOrder".into(), Box::new(PlaceOrderMigrator));
h
}
fn bytes_registry() -> TypeRegistry {
let mut r = TypeRegistry::new();
r.register("test::PlaceOrder", |bytes: &[u8]| {
Ok(Box::new(bytes.to_vec()))
});
r.register("test::OtherMsg", |bytes: &[u8]| {
Ok(Box::new(bytes.to_vec()))
});
r
}
#[test]
fn v1_to_v2_migration_adds_priority_byte() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let v1_body = vec![2, b'A', b'B', 0, 0, 0, 10];
let envelope = make_envelope("test::PlaceOrder", v1_body, Some(1));
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(2)).unwrap();
let body = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*body, vec![2, b'A', b'B', 0, 0, 0, 10, 0]);
}
#[test]
fn v2_to_v3_migration_passes_through() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let v2_body = vec![2, b'A', b'B', 0, 0, 0, 10, 5];
let envelope = make_envelope("test::PlaceOrder", v2_body.clone(), Some(2));
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(3)).unwrap();
let body = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*body, v2_body);
}
#[test]
fn chained_v1_to_v3_migration() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let v1_body = vec![2, b'A', b'B', 0, 0, 0, 10];
let envelope = make_envelope("test::PlaceOrder", v1_body, Some(1));
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(3)).unwrap();
let body = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*body, vec![2, b'A', b'B', 0, 0, 0, 10, 0]);
}
#[test]
fn downgrade_v2_to_v1_rejected() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let v2_body = vec![2, b'A', b'B', 0, 0, 0, 10, 5];
let envelope = make_envelope("test::PlaceOrder", v2_body, Some(2));
let result = receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(1));
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.message.contains("cannot migrate from v2 to v1"),
"error: {}",
err.message
);
}
#[test]
fn same_version_skips_handler() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let body = vec![1, 2, 3];
let envelope = make_envelope("test::PlaceOrder", body.clone(), Some(2));
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(2)).unwrap();
let received = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*received, body);
}
#[test]
fn no_handler_registered_falls_through() {
let registry = bytes_registry();
let handlers = empty_handlers();
let body = vec![1, 2, 3];
let envelope = make_envelope("test::PlaceOrder", body.clone(), Some(1));
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(2)).unwrap();
let received = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*received, body);
}
#[test]
fn sender_version_none_skips_handler() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let body = vec![1, 2, 3];
let envelope = make_envelope("test::PlaceOrder", body.clone(), None);
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(2)).unwrap();
let received = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*received, body);
}
#[test]
fn receiver_version_none_skips_handler() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let body = vec![1, 2, 3];
let envelope = make_envelope("test::PlaceOrder", body.clone(), Some(1));
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, None).unwrap();
let received = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*received, body);
}
#[test]
fn unknown_message_type_with_version_mismatch_falls_through() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let body = vec![42];
let envelope = make_envelope("test::OtherMsg", body.clone(), Some(1));
let result =
receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(2)).unwrap();
let received = result.downcast::<Vec<u8>>().unwrap();
assert_eq!(*received, body);
}
#[test]
fn unknown_version_pair_in_handler_rejected() {
let registry = bytes_registry();
let handlers = place_order_handlers();
let body = vec![1, 2, 3];
let envelope = make_envelope("test::PlaceOrder", body, Some(5));
let result = receive_envelope_body_versioned(&envelope, ®istry, &handlers, Some(2));
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.message.contains("cannot migrate from v5 to v2"));
}