use crate::epmd::{EpmdClient, NodeInfo};
use crate::serialization::{SerializableEnvelope, SerializableMessage, get_global_registry};
use crate::telemetry::DistributedMetrics;
use crate::{ActorSystem, Message, Pid};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};
#[derive(Debug, Error)]
pub enum DistributedError {
#[error("EPMD error: {0}")]
EpmdError(#[from] crate::epmd::EpmdError),
#[error("Node not found: {0}")]
NodeNotFound(String),
#[error("Connection failed: {0}")]
ConnectionFailed(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Actor error: {0}")]
ActorError(#[from] crate::ActorError),
}
pub type Result<T> = std::result::Result<T, DistributedError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HandshakeMessage {
pub version: u8,
pub node_name: String,
pub node_id: u32,
pub listen_address: String,
}
impl HandshakeMessage {
pub fn new(node_name: String, node_id: u32, listen_address: String) -> Self {
Self {
version: 1,
node_name,
node_id,
listen_address,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SystemRpc {
PingRequest { target: Pid, reply_to: Pid },
PongResponse { target: Pid, is_alive: bool },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NetworkMessage {
ActorMessage {
to: Pid,
from: Pid,
payload: Vec<u8>,
},
SystemRpc(SystemRpc),
}
#[allow(dead_code)] struct NodeConnection {
node_info: NodeInfo,
stream: RwLock<Option<TcpStream>>,
}
impl NodeConnection {
async fn connect(
node_info: NodeInfo,
local_node_name: String,
local_node_id: u32,
local_listen_address: String,
) -> Result<Self> {
let addr = node_info.address();
let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
DistributedError::ConnectionFailed(format!("Failed to connect to {}: {}", addr, e))
})?;
info!("Connected to remote node {} at {}", node_info.name, addr);
let handshake = HandshakeMessage::new(local_node_name, local_node_id, local_listen_address);
let handshake_bytes = bincode::serialize(&handshake)
.map_err(|e| DistributedError::SerializationError(e.to_string()))?;
let len = (handshake_bytes.len() as u32).to_be_bytes();
stream.write_all(&len).await?;
stream.write_all(&handshake_bytes).await?;
stream.flush().await?;
debug!("Sent handshake to {}", node_info.name);
Ok(Self {
node_info,
stream: RwLock::new(Some(stream)),
})
}
#[allow(dead_code)] async fn send_message(&self, msg: &NetworkMessage) -> Result<()> {
#[cfg(feature = "telemetry")]
let start = std::time::Instant::now();
let mut stream_guard = self.stream.write().await;
if stream_guard.is_none() {
let addr = self.node_info.address();
match TcpStream::connect(&addr).await {
Ok(stream) => {
info!("Reconnected to {}", addr);
*stream_guard = Some(stream);
}
Err(e) => {
DistributedMetrics::remote_message_failed(
&self.node_info.name,
"reconnect_failed",
);
return Err(DistributedError::ConnectionFailed(format!(
"Failed to reconnect to {}: {}",
addr, e
)));
}
}
}
let msg_bytes = bincode::serialize(msg)
.map_err(|e| DistributedError::SerializationError(e.to_string()))?;
let len = (msg_bytes.len() as u32).to_be_bytes();
let stream = stream_guard.as_mut().unwrap();
if let Err(e) = stream.write_all(&len).await {
*stream_guard = None;
DistributedMetrics::remote_message_failed(&self.node_info.name, "write_failed");
return Err(e.into());
}
if let Err(e) = stream.write_all(&msg_bytes).await {
*stream_guard = None;
DistributedMetrics::remote_message_failed(&self.node_info.name, "write_failed");
return Err(e.into());
}
if let Err(e) = stream.flush().await {
*stream_guard = None;
DistributedMetrics::remote_message_failed(&self.node_info.name, "flush_failed");
return Err(e.into());
}
DistributedMetrics::remote_message_sent(&self.node_info.name);
#[cfg(feature = "telemetry")]
{
let duration = start.elapsed().as_secs_f64();
DistributedMetrics::network_latency(&self.node_info.name, duration);
}
match msg {
NetworkMessage::ActorMessage { to, .. } => {
debug!("Sent message to {} (pid: {})", self.node_info.name, to);
}
NetworkMessage::SystemRpc(_) => {
debug!("Sent system RPC to {}", self.node_info.name);
}
}
Ok(())
}
}
pub struct NodeRegistry {
connections_by_name: Arc<DashMap<String, Arc<NodeConnection>>>,
connections_by_id: Arc<DashMap<u32, Arc<NodeConnection>>>,
node_id_to_name: Arc<DashMap<u32, String>>,
connection_locks: Arc<DashMap<String, Arc<Mutex<()>>>>,
}
impl Default for NodeRegistry {
fn default() -> Self {
Self {
connections_by_name: Arc::new(DashMap::new()),
connections_by_id: Arc::new(DashMap::new()),
node_id_to_name: Arc::new(DashMap::new()),
connection_locks: Arc::new(DashMap::new()),
}
}
}
impl NodeRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn list_connected_nodes(&self) -> Vec<String> {
self.connections_by_name
.iter()
.map(|entry| entry.key().clone())
.collect()
}
pub fn get_node_name(&self, node_id: u32) -> Option<String> {
self.node_id_to_name.get(&node_id).map(|name| name.clone())
}
async fn get_or_connect(
&self,
node_info: NodeInfo,
local_node_name: String,
local_node_id: u32,
local_listen_address: String,
) -> Result<Arc<NodeConnection>> {
let node_name = node_info.name.clone();
if let Some(conn) = self.connections_by_name.get(&node_name) {
return Ok(Arc::clone(&*conn));
}
let lock = self
.connection_locks
.entry(node_name.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone();
let _guard = lock.lock().await;
if let Some(conn) = self.connections_by_name.get(&node_name) {
return Ok(Arc::clone(&*conn));
}
let conn = Arc::new(
NodeConnection::connect(
node_info,
local_node_name,
local_node_id,
local_listen_address,
)
.await?,
);
let node_id = ActorSystem::hash_node_name(&node_name);
self.connections_by_name
.insert(node_name.clone(), Arc::clone(&conn));
self.connections_by_id.insert(node_id, Arc::clone(&conn));
self.node_id_to_name.insert(node_id, node_name.clone());
DistributedMetrics::connection_established(&node_name);
DistributedMetrics::active_connections(self.connections_by_name.len());
info!(
"Established connection to node {} (id: {})",
node_name, node_id
);
Ok(conn)
}
async fn get_by_node_id(&self, node_id: u32) -> Result<Arc<NodeConnection>> {
self.connections_by_id
.get(&node_id)
.map(|conn| Arc::clone(&*conn))
.ok_or_else(|| {
let node_name = self
.node_id_to_name
.get(&node_id)
.map(|name| name.clone())
.unwrap_or_else(|| format!("unknown_node_{}", node_id));
DistributedError::NodeNotFound(node_name)
})
}
#[allow(dead_code)] fn remove(&self, node_name: &str) {
let node_id = ActorSystem::hash_node_name(node_name);
self.connections_by_name.remove(node_name);
self.connections_by_id.remove(&node_id);
self.node_id_to_name.remove(&node_id);
self.connection_locks.remove(node_name);
DistributedMetrics::connection_lost(node_name);
DistributedMetrics::active_connections(self.connections_by_name.len());
debug!("Removed connection to node {} (id: {})", node_name, node_id);
}
}
#[deprecated(
since = "0.5.0",
note = "Use ActorSystem::new_distributed() instead for a unified API"
)]
pub struct DistributedSystem {
system: Arc<ActorSystem>,
node_name: String,
node_id: u32,
epmd_client: EpmdClient,
node_registry: Arc<NodeRegistry>,
_listener_handle: Option<tokio::task::JoinHandle<()>>,
}
#[allow(deprecated)]
impl DistributedSystem {
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub async fn new(
node_name: impl Into<String>,
listen_address: impl Into<String>,
epmd_address: impl Into<String>,
) -> Result<Arc<Self>> {
let node_name = node_name.into();
let listen_address = listen_address.into();
let epmd_address = epmd_address.into();
let node_id = Self::hash_node_name(&node_name);
let system = ActorSystem::new();
let epmd_client = EpmdClient::new(epmd_address);
let parts: Vec<&str> = listen_address.split(':').collect();
let host = parts[0].to_string();
let port: u16 = parts.get(1).and_then(|p| p.parse().ok()).ok_or_else(|| {
DistributedError::ConnectionFailed(format!(
"Invalid listen address: {}",
listen_address
))
})?;
epmd_client.register(&node_name, &host, port).await?;
info!(
"Registered node {} with EPMD at {}:{} (node_id: {})",
node_name, host, port, node_id
);
epmd_client
.start_keep_alive_loop(node_name.clone(), Duration::from_secs(20))
.await;
let node_registry = Arc::new(NodeRegistry::new());
let listener = TcpListener::bind(&listen_address).await?;
info!("Listening for node connections on {}", listen_address);
let system_clone = Arc::clone(&system);
let listener_handle = tokio::spawn(async move {
Self::accept_connections(listener, system_clone).await;
});
Ok(Arc::new(Self {
system,
node_name,
node_id,
epmd_client,
node_registry,
_listener_handle: Some(listener_handle),
}))
}
#[deprecated(
since = "0.5.0",
note = "Use ActorSystem::new_distributed() directly instead"
)]
pub fn system(&self) -> &Arc<ActorSystem> {
&self.system
}
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub fn node_name(&self) -> &str {
&self.node_name
}
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub fn node_id(&self) -> u32 {
self.node_id
}
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub fn make_pid(&self, local_id: u64) -> Pid {
Pid {
node: self.node_id,
id: local_id,
}
}
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub async fn send(&self, from: Pid, to: Pid, msg: Message) -> Result<()> {
if to.node == 0 || to.node == self.node_id {
self.system.send(to, msg).await?;
Ok(())
} else {
self.send_remote(from, to, msg).await
}
}
async fn send_remote(&self, from: Pid, to: Pid, msg: Message) -> Result<()> {
let payload = serialize_message(&msg)?;
let net_msg = NetworkMessage::ActorMessage { to, from, payload };
let conn = self.node_registry.get_by_node_id(to.node).await?;
conn.send_message(&net_msg).await?;
debug!("Sent remote message from {} to {}", from, to);
Ok(())
}
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub async fn connect_to_node(&self, node_name: &str) -> Result<()> {
let node_info = self
.epmd_client
.lookup(node_name)
.await?
.ok_or_else(|| DistributedError::NodeNotFound(node_name.to_string()))?;
self.node_registry
.get_or_connect(
node_info,
self.node_name.clone(),
self.node_id,
format!("{}:{}", self.node_name, 0), )
.await?;
Ok(())
}
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub async fn list_nodes(&self) -> Result<Vec<NodeInfo>> {
Ok(self.epmd_client.list_nodes().await?)
}
#[deprecated(since = "0.5.0", note = "Use ActorSystem::new_distributed() instead")]
pub async fn shutdown(&self) -> Result<()> {
info!("Shutting down node {}", self.node_name);
self.epmd_client.unregister(&self.node_name).await?;
Ok(())
}
async fn accept_connections(listener: TcpListener, system: Arc<ActorSystem>) {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
debug!("Accepted connection from {}", addr);
let system_clone = Arc::clone(&system);
tokio::spawn(async move {
if let Err(e) = Self::handle_node_connection(stream, system_clone).await {
error!("Connection handler error: {}", e);
}
});
}
Err(e) => {
error!("Accept error: {}", e);
}
}
}
}
async fn handle_node_connection(
mut stream: TcpStream,
_system: Arc<ActorSystem>,
) -> Result<()> {
loop {
let mut len_buf = [0u8; 4];
match stream.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
debug!("Remote node disconnected");
return Ok(());
}
Err(e) => return Err(e.into()),
}
let len = u32::from_be_bytes(len_buf) as usize;
if len > 10 * 1024 * 1024 {
return Err(DistributedError::SerializationError(
"Message too large".to_string(),
));
}
let mut msg_buf = vec![0u8; len];
stream.read_exact(&mut msg_buf).await?;
warn!("Using deprecated DistributedSystem message handling");
}
}
pub fn hash_node_name(name: &str) -> u32 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
name.hash(&mut hasher);
(hasher.finish() & 0xFFFFFFFF) as u32
}
}
fn serialize_message(msg: &Message) -> Result<Vec<u8>> {
let serializable = msg
.downcast_ref::<Box<dyn SerializableMessage>>()
.ok_or_else(|| {
DistributedMetrics::serialization_error();
DistributedError::SerializationError(
"Message does not implement SerializableMessage".to_string(),
)
})?;
let envelope = SerializableEnvelope::wrap(serializable.as_ref()).map_err(|e| {
DistributedMetrics::serialization_error();
DistributedError::SerializationError(e.to_string())
})?;
Ok(envelope.to_bytes())
}
fn deserialize_message(data: &[u8]) -> Result<Message> {
let envelope = SerializableEnvelope::from_bytes(data).map_err(|e| {
DistributedMetrics::serialization_error();
DistributedError::SerializationError(e.to_string())
})?;
let registry = get_global_registry();
let registry_guard = registry.read().unwrap();
let serializable = envelope.unwrap(®istry_guard).map_err(|e| {
DistributedMetrics::serialization_error();
DistributedError::SerializationError(e.to_string())
})?;
Ok(Box::new(serializable))
}
pub async fn send_remote(
node_registry: &Arc<NodeRegistry>,
from: Pid,
to: Pid,
msg: Message,
) -> Result<()> {
let payload = serialize_message(&msg)?;
let net_msg = NetworkMessage::ActorMessage { to, from, payload };
let conn = node_registry.get_by_node_id(to.node).await?;
conn.send_message(&net_msg).await?;
debug!("Sent remote message from {} to {}", from, to);
Ok(())
}
pub async fn connect_to_node(
epmd_client: &EpmdClient,
node_registry: &Arc<NodeRegistry>,
remote_node_name: &str,
local_node_name: &str,
local_node_id: u32,
local_listen_address: &str,
) -> Result<()> {
let node_info = epmd_client
.lookup(remote_node_name)
.await?
.ok_or_else(|| DistributedError::NodeNotFound(remote_node_name.to_string()))?;
node_registry
.get_or_connect(
node_info,
local_node_name.to_string(),
local_node_id,
local_listen_address.to_string(),
)
.await?;
info!("Connected to remote node {}", remote_node_name);
Ok(())
}
pub async fn ping_process(node_registry: &Arc<NodeRegistry>, pid: Pid) -> Result<bool> {
use tokio::time::{Duration, timeout};
let reply_to = Pid::new();
let ping_msg = NetworkMessage::SystemRpc(SystemRpc::PingRequest {
target: pid,
reply_to,
});
let conn = node_registry.get_by_node_id(pid.node()).await?;
conn.send_message(&ping_msg).await?;
match timeout(
Duration::from_secs(5),
tokio::time::sleep(Duration::from_millis(100)),
)
.await
{
Ok(_) => {
debug!("Ping sent to {} successfully", pid);
Ok(true)
}
Err(_) => {
debug!("Ping to {} timed out", pid);
Ok(false)
}
}
}
pub async fn accept_connections(
listener: TcpListener,
system: Arc<ActorSystem>,
_node_name: String,
_node_id: u32,
_listen_address: String,
) {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
debug!("Accepted connection from {}", addr);
let system_clone = Arc::clone(&system);
tokio::spawn(async move {
if let Err(e) = handle_node_connection(stream, system_clone).await {
error!("Connection handler error: {}", e);
}
});
}
Err(e) => {
error!("Accept error: {}", e);
}
}
}
}
async fn handle_node_connection(mut stream: TcpStream, system: Arc<ActorSystem>) -> Result<()> {
let handshake = read_handshake(&mut stream).await?;
info!(
"Received handshake from node {} (id: {}) at {}",
handshake.node_name, handshake.node_id, handshake.listen_address
);
if let (Some(local_name), Some(epmd), Some(registry), Some(local_addr)) = (
system.node(),
system.epmd_client(),
system.node_registry(),
system.listen_address(),
) {
let remote_name = handshake.node_name.clone();
if !registry.list_connected_nodes().contains(&remote_name) {
debug!("Establishing reverse connection to {}", remote_name);
if let Err(e) = connect_to_node(
epmd,
registry,
&remote_name,
local_name,
system.local_node_id(),
local_addr,
)
.await
{
warn!(
"Failed to establish reverse connection to {}: {}",
remote_name, e
);
} else {
info!("Established bidirectional connection with {}", remote_name);
}
}
}
loop {
let mut len_buf = [0u8; 4];
match stream.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
debug!("Remote node {} disconnected", handshake.node_name);
return Ok(());
}
Err(e) => return Err(e.into()),
}
let len = u32::from_be_bytes(len_buf) as usize;
if len > 10 * 1024 * 1024 {
return Err(DistributedError::SerializationError(
"Message too large".to_string(),
));
}
let mut msg_buf = vec![0u8; len];
stream.read_exact(&mut msg_buf).await?;
let net_msg: NetworkMessage = bincode::deserialize(&msg_buf)
.map_err(|e| DistributedError::SerializationError(e.to_string()))?;
match net_msg {
NetworkMessage::ActorMessage { to, from, payload } => {
debug!("Received actor message from {} to {}", from, to);
match deserialize_message(&payload) {
Ok(msg) => {
if let Err(e) = system.send(to, msg).await {
warn!("Failed to deliver remote message to {}: {}", to, e);
}
}
Err(e) => {
error!("Failed to deserialize message payload from {}: {}", from, e);
}
}
}
NetworkMessage::SystemRpc(rpc) => {
handle_system_rpc(rpc, &system).await;
}
}
}
}
async fn read_handshake(stream: &mut TcpStream) -> Result<HandshakeMessage> {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
if len > 1024 {
return Err(DistributedError::SerializationError(
"Handshake too large".to_string(),
));
}
let mut handshake_buf = vec![0u8; len];
stream.read_exact(&mut handshake_buf).await?;
bincode::deserialize(&handshake_buf)
.map_err(|e| DistributedError::SerializationError(format!("Invalid handshake: {}", e)))
}
async fn handle_system_rpc(rpc: SystemRpc, system: &Arc<ActorSystem>) {
match rpc {
SystemRpc::PingRequest { target, reply_to } => {
debug!("Received ping request for {} from {}", target, reply_to);
let is_alive = system.is_actor_alive(target);
let response = SystemRpc::PongResponse { target, is_alive };
let net_msg = NetworkMessage::SystemRpc(response);
if let Some(registry) = system.node_registry()
&& let Ok(conn) = registry.get_by_node_id(reply_to.node()).await
&& let Err(e) = conn.send_message(&net_msg).await
{
warn!("Failed to send pong response: {}", e);
}
}
SystemRpc::PongResponse { target, is_alive } => {
debug!("Received pong response for {}: alive={}", target, is_alive);
}
}
}
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use super::*;
use crate::serialization::{SerializableMessage, SerializationError, register_message_type};
use std::any::Any;
#[test]
fn test_hash_node_name() {
let id1 = DistributedSystem::hash_node_name("node_a");
let id2 = DistributedSystem::hash_node_name("node_b");
let id3 = DistributedSystem::hash_node_name("node_a");
assert_ne!(id1, id2);
assert_eq!(id1, id3);
}
#[test]
fn test_network_message_serialization() {
let msg = NetworkMessage::ActorMessage {
to: Pid { node: 1, id: 42 },
from: Pid { node: 2, id: 100 },
payload: vec![1, 2, 3, 4],
};
let bytes = bincode::serialize(&msg).unwrap();
let deserialized: NetworkMessage = bincode::deserialize(&bytes).unwrap();
match (msg, deserialized) {
(
NetworkMessage::ActorMessage {
to: to1,
from: from1,
payload: payload1,
},
NetworkMessage::ActorMessage {
to: to2,
from: from2,
payload: payload2,
},
) => {
assert_eq!(to1, to2);
assert_eq!(from1, from2);
assert_eq!(payload1, payload2);
}
_ => panic!("Expected ActorMessage variants"),
}
}
#[derive(Debug, Clone, PartialEq)]
struct TestMsg {
value: u32,
text: String,
}
impl SerializableMessage for TestMsg {
fn message_type_id(&self) -> &'static str {
"test::TestMsg"
}
fn as_any(&self) -> &dyn Any {
self
}
fn serialize(&self) -> std::result::Result<Vec<u8>, SerializationError> {
let mut bytes = self.value.to_le_bytes().to_vec();
bytes.extend_from_slice(self.text.as_bytes());
Ok(bytes)
}
}
fn deserialize_test_msg(
data: &[u8],
) -> std::result::Result<Box<dyn SerializableMessage>, SerializationError> {
if data.len() < 4 {
return Err(SerializationError::InvalidFormat("Too short".into()));
}
let value = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
let text = String::from_utf8(data[4..].to_vec())
.map_err(|e| SerializationError::DeserializeFailed(e.to_string()))?;
Ok(Box::new(TestMsg { value, text }))
}
#[test]
fn test_serialize_message_roundtrip() {
register_message_type("test::TestMsg", Box::new(deserialize_test_msg));
let original = TestMsg {
value: 42,
text: "Hello, World!".to_string(),
};
let msg: Message = Box::new(Box::new(original.clone()) as Box<dyn SerializableMessage>);
let serialized = serialize_message(&msg).expect("Serialization should succeed");
let deserialized =
deserialize_message(&serialized).expect("Deserialization should succeed");
let result = deserialized
.downcast_ref::<Box<dyn SerializableMessage>>()
.expect("Should downcast to SerializableMessage")
.as_ref()
.as_any()
.downcast_ref::<TestMsg>()
.expect("Should downcast to TestMsg");
assert_eq!(result.value, original.value);
assert_eq!(result.text, original.text);
}
#[test]
fn test_serialize_non_serializable_message() {
let msg: Message = Box::new("plain string");
let result = serialize_message(&msg);
assert!(result.is_err());
match result {
Err(DistributedError::SerializationError(err)) => {
assert!(err.contains("does not implement SerializableMessage"));
}
_ => panic!("Expected SerializationError"),
}
}
#[test]
fn test_deserialize_invalid_envelope() {
let invalid_data = vec![1, 2, 3];
let result = deserialize_message(&invalid_data);
assert!(result.is_err());
}
#[test]
fn test_deserialize_unknown_message_type() {
use crate::serialization::SerializableEnvelope;
struct UnknownMsg;
impl SerializableMessage for UnknownMsg {
fn message_type_id(&self) -> &'static str {
"test::UnknownMsg"
}
fn as_any(&self) -> &dyn Any {
self
}
fn serialize(&self) -> std::result::Result<Vec<u8>, SerializationError> {
Ok(vec![1, 2, 3, 4])
}
}
let envelope = SerializableEnvelope::wrap(&UnknownMsg).unwrap();
let bytes = envelope.to_bytes();
let result = deserialize_message(&bytes);
assert!(result.is_err());
}
}