use std::collections::{HashMap, HashSet};
use crate::node::{ActorId, NodeId};
use crate::remote::SerializationError;
pub const SYSTEM_MSG_TYPE_SPAWN: &str = "dactor::system_actors::SpawnRequest";
pub const SYSTEM_MSG_TYPE_WATCH: &str = "dactor::system_actors::WatchRequest";
pub const SYSTEM_MSG_TYPE_UNWATCH: &str = "dactor::system_actors::UnwatchRequest";
pub const SYSTEM_MSG_TYPE_CANCEL: &str = "dactor::system_actors::CancelRequest";
pub const SYSTEM_MSG_TYPE_CONNECT_PEER: &str = "dactor::system_actors::ConnectPeer";
pub const SYSTEM_MSG_TYPE_DISCONNECT_PEER: &str = "dactor::system_actors::DisconnectPeer";
pub fn is_system_message_type(message_type: &str) -> bool {
matches!(
message_type,
SYSTEM_MSG_TYPE_SPAWN
| SYSTEM_MSG_TYPE_WATCH
| SYSTEM_MSG_TYPE_UNWATCH
| SYSTEM_MSG_TYPE_CANCEL
| SYSTEM_MSG_TYPE_CONNECT_PEER
| SYSTEM_MSG_TYPE_DISCONNECT_PEER
)
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SpawnRequest {
pub type_name: String,
pub args_bytes: Vec<u8>,
pub name: String,
pub request_id: String,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum SpawnResponse {
Success {
request_id: String,
actor_id: ActorId,
},
Failure {
request_id: String,
error: String,
},
}
pub struct SpawnManager {
type_registry: crate::type_registry::TypeRegistry,
spawned: Vec<ActorId>,
}
impl SpawnManager {
pub fn new(type_registry: crate::type_registry::TypeRegistry) -> Self {
Self {
type_registry,
spawned: Vec::new(),
}
}
pub fn create_actor(
&self,
request: &SpawnRequest,
) -> Result<Box<dyn std::any::Any + Send>, SerializationError> {
self.type_registry
.create_actor(&request.type_name, &request.args_bytes)
}
pub fn record_spawn(&mut self, id: ActorId) {
self.spawned.push(id);
}
pub fn spawned_actors(&self) -> &[ActorId] {
&self.spawned
}
pub fn type_registry(&self) -> &crate::type_registry::TypeRegistry {
&self.type_registry
}
pub fn type_registry_mut(&mut self) -> &mut crate::type_registry::TypeRegistry {
&mut self.type_registry
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct WatchRequest {
pub target: ActorId,
pub watcher: ActorId,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct UnwatchRequest {
pub target: ActorId,
pub watcher: ActorId,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct WatchNotification {
pub terminated: ActorId,
pub watcher: ActorId,
}
pub struct WatchManager {
watchers: HashMap<ActorId, HashSet<ActorId>>,
}
impl WatchManager {
pub fn new() -> Self {
Self {
watchers: HashMap::new(),
}
}
pub fn watch(&mut self, target: ActorId, watcher: ActorId) {
self.watchers.entry(target).or_default().insert(watcher);
}
pub fn unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
if let Some(set) = self.watchers.get_mut(target) {
set.remove(watcher);
if set.is_empty() {
self.watchers.remove(target);
}
}
}
pub fn on_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
self.watchers
.remove(terminated)
.unwrap_or_default()
.into_iter()
.map(|watcher| WatchNotification {
terminated: terminated.clone(),
watcher,
})
.collect()
}
pub fn watchers_of(&self, target: &ActorId) -> Vec<ActorId> {
self.watchers
.get(target)
.map(|s| s.iter().cloned().collect())
.unwrap_or_default()
}
pub fn watched_count(&self) -> usize {
self.watchers.len()
}
}
impl Default for WatchManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct CancelRequest {
pub target: ActorId,
pub request_id: Option<String>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum CancelResponse {
Acknowledged,
NotFound {
reason: String,
},
}
pub struct CancelManager {
tokens: HashMap<String, tokio_util::sync::CancellationToken>,
}
impl CancelManager {
pub fn new() -> Self {
Self {
tokens: HashMap::new(),
}
}
pub fn register(&mut self, request_id: String, token: tokio_util::sync::CancellationToken) {
self.tokens.insert(request_id, token);
}
pub fn cancel(&mut self, request_id: &str) -> CancelResponse {
if let Some(token) = self.tokens.remove(request_id) {
token.cancel();
CancelResponse::Acknowledged
} else {
CancelResponse::NotFound {
reason: format!("no active request with id '{request_id}'"),
}
}
}
pub fn remove(&mut self, request_id: &str) {
self.tokens.remove(request_id);
}
pub fn active_count(&self) -> usize {
self.tokens.len()
}
}
impl Default for CancelManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum PeerStatus {
Connecting,
Connected,
Unreachable,
Disconnected,
}
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub node_id: NodeId,
pub status: PeerStatus,
pub address: Option<String>,
}
pub struct NodeDirectory {
peers: HashMap<NodeId, PeerInfo>,
}
impl NodeDirectory {
pub fn new() -> Self {
Self {
peers: HashMap::new(),
}
}
pub fn add_peer(&mut self, node_id: NodeId, address: Option<String>) {
self.peers.insert(
node_id.clone(),
PeerInfo {
node_id,
status: PeerStatus::Connecting,
address,
},
);
}
pub fn set_status(&mut self, node_id: &NodeId, status: PeerStatus) {
if let Some(info) = self.peers.get_mut(node_id) {
info.status = status;
}
}
pub fn remove_peer(&mut self, node_id: &NodeId) {
self.peers.remove(node_id);
}
pub fn get_peer(&self, node_id: &NodeId) -> Option<&PeerInfo> {
self.peers.get(node_id)
}
pub fn peer_nodes(&self) -> Vec<NodeId> {
self.peers.keys().cloned().collect()
}
pub fn peers_with_status(&self, status: PeerStatus) -> Vec<&PeerInfo> {
self.peers.values().filter(|p| p.status == status).collect()
}
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub fn connected_count(&self) -> usize {
self.peers
.values()
.filter(|p| p.status == PeerStatus::Connected)
.count()
}
pub fn is_connected(&self, node_id: &NodeId) -> bool {
self.peers
.get(node_id)
.map(|p| p.status == PeerStatus::Connected)
.unwrap_or(false)
}
}
impl Default for NodeDirectory {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HandshakeRequest {
pub node_id: NodeId,
pub wire_version: crate::version::WireVersion,
pub app_version: Option<String>,
pub adapter: String,
}
impl HandshakeRequest {
pub fn from_runtime(
node_id: NodeId,
app_version: Option<String>,
adapter: impl Into<String>,
) -> Self {
Self {
node_id,
wire_version: crate::version::WireVersion::parse(
crate::version::DACTOR_WIRE_VERSION,
)
.expect("DACTOR_WIRE_VERSION must be valid"),
app_version,
adapter: adapter.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HandshakeResponse {
Accepted {
node_id: NodeId,
wire_version: crate::version::WireVersion,
app_version: Option<String>,
adapter: String,
},
Rejected {
node_id: NodeId,
wire_version: crate::version::WireVersion,
reason: RejectionReason,
detail: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RejectionReason {
IncompatibleProtocol,
IncompatibleAdapter,
}
impl std::fmt::Display for RejectionReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RejectionReason::IncompatibleProtocol => write!(f, "incompatible wire protocol"),
RejectionReason::IncompatibleAdapter => write!(f, "incompatible adapter"),
}
}
}
pub fn validate_handshake(
local: &HandshakeRequest,
remote: &HandshakeRequest,
) -> HandshakeResponse {
if !local.wire_version.is_compatible(&remote.wire_version) {
return HandshakeResponse::Rejected {
node_id: local.node_id.clone(),
wire_version: local.wire_version,
reason: RejectionReason::IncompatibleProtocol,
detail: format!(
"wire version {remote} is incompatible with local {local} \
(different MAJOR version)",
remote = remote.wire_version,
local = local.wire_version,
),
};
}
if local.adapter != remote.adapter {
return HandshakeResponse::Rejected {
node_id: local.node_id.clone(),
wire_version: local.wire_version,
reason: RejectionReason::IncompatibleAdapter,
detail: format!(
"adapter \"{remote}\" does not match local \"{local}\"",
remote = remote.adapter,
local = local.adapter,
),
};
}
HandshakeResponse::Accepted {
node_id: local.node_id.clone(),
wire_version: local.wire_version,
app_version: local.app_version.clone(),
adapter: local.adapter.clone(),
}
}
pub fn verify_peer_identity(
expected: &NodeId,
response: &HandshakeResponse,
) -> Result<(), String> {
match response {
HandshakeResponse::Accepted { node_id, .. } => {
if node_id != expected {
Err(format!(
"peer identity mismatch: expected {expected}, got {node_id}"
))
} else {
Ok(())
}
}
HandshakeResponse::Rejected { .. } => {
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn spawn_manager_create_actor() {
let mut registry = crate::type_registry::TypeRegistry::new();
registry.register_factory("test::Worker", |bytes: &[u8]| {
if bytes.len() != 8 {
return Err(SerializationError::new("expected 8 bytes"));
}
let val = u64::from_be_bytes(bytes.try_into().unwrap());
Ok(Box::new(val))
});
let manager = SpawnManager::new(registry);
let request = SpawnRequest {
type_name: "test::Worker".into(),
args_bytes: 42u64.to_be_bytes().to_vec(),
name: "worker-1".into(),
request_id: "req-1".into(),
};
let actor = manager.create_actor(&request).unwrap();
let val = actor.downcast::<u64>().unwrap();
assert_eq!(*val, 42);
}
#[test]
fn spawn_manager_unknown_type() {
let registry = crate::type_registry::TypeRegistry::new();
let manager = SpawnManager::new(registry);
let request = SpawnRequest {
type_name: "unknown::Type".into(),
args_bytes: vec![],
name: "x".into(),
request_id: "req-2".into(),
};
let result = manager.create_actor(&request);
assert!(result.is_err());
}
#[test]
fn spawn_manager_records_spawned() {
let registry = crate::type_registry::TypeRegistry::new();
let mut manager = SpawnManager::new(registry);
assert!(manager.spawned_actors().is_empty());
let id = ActorId {
node: NodeId("n1".into()),
local: 1,
};
manager.record_spawn(id.clone());
assert_eq!(manager.spawned_actors().len(), 1);
assert_eq!(manager.spawned_actors()[0], id);
}
#[test]
fn watch_and_terminate() {
let mut wm = WatchManager::new();
let target = ActorId {
node: NodeId("n1".into()),
local: 1,
};
let watcher = ActorId {
node: NodeId("n2".into()),
local: 10,
};
wm.watch(target.clone(), watcher.clone());
assert_eq!(wm.watched_count(), 1);
assert_eq!(wm.watchers_of(&target).len(), 1);
let notifications = wm.on_terminated(&target);
assert_eq!(notifications.len(), 1);
assert_eq!(notifications[0].terminated, target);
assert_eq!(notifications[0].watcher, watcher);
assert_eq!(wm.watched_count(), 0);
}
#[test]
fn watch_multiple_watchers() {
let mut wm = WatchManager::new();
let target = ActorId {
node: NodeId("n1".into()),
local: 1,
};
let w1 = ActorId {
node: NodeId("n2".into()),
local: 10,
};
let w2 = ActorId {
node: NodeId("n3".into()),
local: 20,
};
wm.watch(target.clone(), w1.clone());
wm.watch(target.clone(), w2.clone());
assert_eq!(wm.watchers_of(&target).len(), 2);
let notifications = wm.on_terminated(&target);
assert_eq!(notifications.len(), 2);
}
#[test]
fn unwatch_removes_subscription() {
let mut wm = WatchManager::new();
let target = ActorId {
node: NodeId("n1".into()),
local: 1,
};
let watcher = ActorId {
node: NodeId("n2".into()),
local: 10,
};
wm.watch(target.clone(), watcher.clone());
wm.unwatch(&target, &watcher);
assert_eq!(wm.watched_count(), 0);
let notifications = wm.on_terminated(&target);
assert!(notifications.is_empty());
}
#[test]
fn terminate_unwatched_actor_returns_empty() {
let mut wm = WatchManager::new();
let target = ActorId {
node: NodeId("n1".into()),
local: 99,
};
let notifications = wm.on_terminated(&target);
assert!(notifications.is_empty());
}
#[test]
fn cancel_registered_request() {
let mut cm = CancelManager::new();
let token = tokio_util::sync::CancellationToken::new();
let token_check = token.clone();
cm.register("req-1".into(), token);
assert_eq!(cm.active_count(), 1);
let response = cm.cancel("req-1");
assert!(matches!(response, CancelResponse::Acknowledged));
assert!(token_check.is_cancelled());
assert_eq!(cm.active_count(), 0);
}
#[test]
fn cancel_unknown_request_returns_not_found() {
let mut cm = CancelManager::new();
let response = cm.cancel("nonexistent");
assert!(matches!(response, CancelResponse::NotFound { .. }));
}
#[test]
fn remove_cleans_up_token() {
let mut cm = CancelManager::new();
let token = tokio_util::sync::CancellationToken::new();
cm.register("req-1".into(), token);
assert_eq!(cm.active_count(), 1);
cm.remove("req-1");
assert_eq!(cm.active_count(), 0);
}
#[test]
fn add_and_query_peers() {
let mut dir = NodeDirectory::new();
dir.add_peer(NodeId("n1".into()), Some("10.0.0.1:4697".into()));
dir.add_peer(NodeId("n2".into()), None);
assert_eq!(dir.peer_count(), 2);
assert!(!dir.is_connected(&NodeId("n1".into())));
let info = dir.get_peer(&NodeId("n1".into())).unwrap();
assert_eq!(info.status, PeerStatus::Connecting);
assert_eq!(info.address.as_deref(), Some("10.0.0.1:4697"));
}
#[test]
fn set_status_and_filter() {
let mut dir = NodeDirectory::new();
dir.add_peer(NodeId("n1".into()), None);
dir.add_peer(NodeId("n2".into()), None);
dir.add_peer(NodeId("n3".into()), None);
dir.set_status(&NodeId("n1".into()), PeerStatus::Connected);
dir.set_status(&NodeId("n2".into()), PeerStatus::Connected);
dir.set_status(&NodeId("n3".into()), PeerStatus::Unreachable);
assert_eq!(dir.connected_count(), 2);
assert!(dir.is_connected(&NodeId("n1".into())));
assert!(!dir.is_connected(&NodeId("n3".into())));
let connected = dir.peers_with_status(PeerStatus::Connected);
assert_eq!(connected.len(), 2);
let unreachable = dir.peers_with_status(PeerStatus::Unreachable);
assert_eq!(unreachable.len(), 1);
}
#[test]
fn remove_peer() {
let mut dir = NodeDirectory::new();
dir.add_peer(NodeId("n1".into()), None);
assert_eq!(dir.peer_count(), 1);
dir.remove_peer(&NodeId("n1".into()));
assert_eq!(dir.peer_count(), 0);
assert!(dir.get_peer(&NodeId("n1".into())).is_none());
}
#[test]
fn peer_nodes_list() {
let mut dir = NodeDirectory::new();
dir.add_peer(NodeId("n1".into()), None);
dir.add_peer(NodeId("n2".into()), None);
let nodes = dir.peer_nodes();
assert_eq!(nodes.len(), 2);
}
#[test]
fn spawn_response_variants() {
let success = SpawnResponse::Success {
request_id: "r1".into(),
actor_id: ActorId {
node: NodeId("n1".into()),
local: 42,
},
};
assert!(matches!(success, SpawnResponse::Success { .. }));
let failure = SpawnResponse::Failure {
request_id: "r2".into(),
error: "type not found".into(),
};
assert!(matches!(failure, SpawnResponse::Failure { .. }));
}
#[test]
fn watch_notification_fields() {
let notif = WatchNotification {
terminated: ActorId {
node: NodeId("n1".into()),
local: 1,
},
watcher: ActorId {
node: NodeId("n2".into()),
local: 2,
},
};
assert_eq!(notif.terminated.local, 1);
assert_eq!(notif.watcher.local, 2);
}
#[test]
fn peer_status_transitions() {
let mut dir = NodeDirectory::new();
dir.add_peer(NodeId("n1".into()), None);
assert_eq!(
dir.get_peer(&NodeId("n1".into())).unwrap().status,
PeerStatus::Connecting
);
dir.set_status(&NodeId("n1".into()), PeerStatus::Connected);
assert_eq!(
dir.get_peer(&NodeId("n1".into())).unwrap().status,
PeerStatus::Connected
);
dir.set_status(&NodeId("n1".into()), PeerStatus::Unreachable);
assert_eq!(
dir.get_peer(&NodeId("n1".into())).unwrap().status,
PeerStatus::Unreachable
);
dir.set_status(&NodeId("n1".into()), PeerStatus::Disconnected);
assert_eq!(
dir.get_peer(&NodeId("n1".into())).unwrap().status,
PeerStatus::Disconnected
);
}
#[test]
fn wire_protocol_constants_are_stable() {
assert_eq!(
SYSTEM_MSG_TYPE_SPAWN,
"dactor::system_actors::SpawnRequest",
"SYSTEM_MSG_TYPE_SPAWN is a wire protocol value — do not change"
);
assert_eq!(
SYSTEM_MSG_TYPE_WATCH,
"dactor::system_actors::WatchRequest",
"SYSTEM_MSG_TYPE_WATCH is a wire protocol value — do not change"
);
assert_eq!(
SYSTEM_MSG_TYPE_UNWATCH,
"dactor::system_actors::UnwatchRequest",
"SYSTEM_MSG_TYPE_UNWATCH is a wire protocol value — do not change"
);
assert_eq!(
SYSTEM_MSG_TYPE_CANCEL,
"dactor::system_actors::CancelRequest",
"SYSTEM_MSG_TYPE_CANCEL is a wire protocol value — do not change"
);
assert_eq!(
SYSTEM_MSG_TYPE_CONNECT_PEER,
"dactor::system_actors::ConnectPeer",
"SYSTEM_MSG_TYPE_CONNECT_PEER is a wire protocol value — do not change"
);
assert_eq!(
SYSTEM_MSG_TYPE_DISCONNECT_PEER,
"dactor::system_actors::DisconnectPeer",
"SYSTEM_MSG_TYPE_DISCONNECT_PEER is a wire protocol value — do not change"
);
}
fn make_handshake_request(
node_id: &str,
wire: &str,
adapter: &str,
app_version: Option<&str>,
) -> HandshakeRequest {
HandshakeRequest {
node_id: NodeId(node_id.into()),
wire_version: crate::version::WireVersion::parse(wire).unwrap(),
app_version: app_version.map(|s| s.into()),
adapter: adapter.into(),
}
}
#[test]
fn handshake_same_version_and_adapter_accepted() {
let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
let remote = make_handshake_request("node-b", "0.2.0", "ractor", None);
let resp = validate_handshake(&local, &remote);
assert!(matches!(resp, HandshakeResponse::Accepted { .. }));
}
#[test]
fn handshake_same_major_different_minor_accepted() {
let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
let remote = make_handshake_request("node-b", "0.3.0", "ractor", None);
let resp = validate_handshake(&local, &remote);
assert!(matches!(resp, HandshakeResponse::Accepted { .. }));
}
#[test]
fn handshake_different_major_rejected() {
let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
let remote = make_handshake_request("node-b", "1.0.0", "ractor", None);
let resp = validate_handshake(&local, &remote);
match resp {
HandshakeResponse::Rejected { reason, detail, .. } => {
assert_eq!(reason, RejectionReason::IncompatibleProtocol);
assert!(detail.contains("MAJOR"));
}
_ => panic!("expected Rejected"),
}
}
#[test]
fn handshake_different_adapter_rejected() {
let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
let remote = make_handshake_request("node-b", "0.2.0", "kameo", None);
let resp = validate_handshake(&local, &remote);
match resp {
HandshakeResponse::Rejected { reason, detail, .. } => {
assert_eq!(reason, RejectionReason::IncompatibleAdapter);
assert!(detail.contains("kameo"));
assert!(detail.contains("ractor"));
}
_ => panic!("expected Rejected"),
}
}
#[test]
fn handshake_protocol_checked_before_adapter() {
let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
let remote = make_handshake_request("node-b", "1.0.0", "kameo", None);
let resp = validate_handshake(&local, &remote);
match resp {
HandshakeResponse::Rejected { reason, .. } => {
assert_eq!(reason, RejectionReason::IncompatibleProtocol);
}
_ => panic!("expected Rejected"),
}
}
#[test]
fn handshake_accepted_carries_local_info() {
let local = make_handshake_request("node-a", "0.2.0", "ractor", Some("1.0.0"));
let remote = make_handshake_request("node-b", "0.2.0", "ractor", Some("2.0.0"));
match validate_handshake(&local, &remote) {
HandshakeResponse::Accepted {
node_id,
wire_version,
app_version,
adapter,
} => {
assert_eq!(node_id, NodeId("node-a".into()));
assert_eq!(wire_version.to_string(), "0.2.0");
assert_eq!(app_version.as_deref(), Some("1.0.0"));
assert_eq!(adapter, "ractor");
}
_ => panic!("expected Accepted"),
}
}
#[test]
fn rejection_reason_display() {
assert_eq!(
RejectionReason::IncompatibleProtocol.to_string(),
"incompatible wire protocol"
);
assert_eq!(
RejectionReason::IncompatibleAdapter.to_string(),
"incompatible adapter"
);
}
#[test]
fn verify_peer_identity_matching() {
let resp = HandshakeResponse::Accepted {
node_id: NodeId("node-2".into()),
wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
app_version: None,
adapter: "ractor".into(),
};
assert!(verify_peer_identity(&NodeId("node-2".into()), &resp).is_ok());
}
#[test]
fn verify_peer_identity_mismatch() {
let resp = HandshakeResponse::Accepted {
node_id: NodeId("node-X".into()),
wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
app_version: None,
adapter: "ractor".into(),
};
let result = verify_peer_identity(&NodeId("node-2".into()), &resp);
assert!(result.is_err());
assert!(result.unwrap_err().contains("mismatch"));
}
#[test]
fn verify_peer_identity_rejected_is_ok() {
let resp = HandshakeResponse::Rejected {
node_id: NodeId("node-2".into()),
wire_version: crate::version::WireVersion::parse("1.0.0").unwrap(),
reason: RejectionReason::IncompatibleProtocol,
detail: "test".into(),
};
assert!(verify_peer_identity(&NodeId("node-2".into()), &resp).is_ok());
}
}