use std::{
borrow::Cow,
collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
fmt,
num::NonZero,
str::{self, FromStr},
sync::Arc,
task,
time::{Duration, Instant},
};
use libp2p::{
Multiaddr, PeerId, StreamProtocol,
kad::{self, StoreInserts, store::RecordStore},
swarm::{
ConnectionDenied, ConnectionId, DialError, DialFailure, FromSwarm, NetworkBehaviour,
NewExternalAddrOfPeer, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
behaviour::ConnectionEstablished,
},
};
use tokio::sync::{mpsc, oneshot};
use crate::{
actor::{ActorId, ActorIdFromBytesError},
error::RegistryError,
};
const PROTO_NAME: StreamProtocol = StreamProtocol::new("/kameo/registry/1.0.0");
type RegisterResult = Result<(), RegistryError>;
pub(super) type LookupResult = Result<ActorRegistration<'static>, RegistryError>;
type LookupLocalResult = Result<Option<ActorRegistration<'static>>, RegistryError>;
pub(super) type RegisterReply = oneshot::Sender<RegisterResult>;
pub(super) type LookupReply = mpsc::UnboundedSender<LookupResult>;
pub(super) type LookupLocalReply = oneshot::Sender<LookupLocalResult>;
pub(super) type UnregisterReply = oneshot::Sender<()>;
#[derive(Debug)]
pub enum Event {
LookupProgressed {
provider_query_id: kad::QueryId,
get_query_id: kad::QueryId,
result: Result<ActorRegistration<'static>, RegistryError>,
},
LookupTimeout {
provider_query_id: kad::QueryId,
},
LookupCompleted {
provider_query_id: kad::QueryId,
},
RegistrationFailed {
provider_query_id: kad::QueryId,
error: RegistryError,
},
RegisteredActor {
provider_result: kad::AddProviderResult,
provider_query_id: kad::QueryId,
metadata_result: kad::PutRecordResult,
metadata_query_id: kad::QueryId,
},
RoutingUpdated {
peer: PeerId,
is_new_peer: bool,
addresses: kad::Addresses,
bucket_range: (kad::KBucketDistance, kad::KBucketDistance),
old_peer: Option<PeerId>,
},
UnroutablePeer {
peer: PeerId,
},
RoutablePeer {
peer: PeerId,
address: Multiaddr,
},
PendingRoutablePeer {
peer: PeerId,
address: Multiaddr,
},
}
#[allow(missing_debug_implementations)]
pub struct Behaviour {
kademlia: kad::Behaviour<kad::store::MemoryStore>,
local_peer_id: PeerId,
pending_peers: HashMap<(PeerId, Multiaddr), Instant>,
pending_events: VecDeque<Event>,
registration_queries: HashMap<kad::QueryId, RegistrationQuery>,
lookup_queries: HashMap<kad::QueryId, LookupQuery>,
}
impl Behaviour {
pub fn new(local_peer_id: PeerId) -> Self {
let mut config = kad::Config::new(PROTO_NAME);
config.set_query_timeout(Duration::from_secs(10));
config.set_replication_factor(NonZero::new(5).unwrap());
config.set_record_ttl(Some(Duration::from_secs(3600)));
config.set_publication_interval(Some(Duration::from_secs(1800)));
config.set_record_filtering(StoreInserts::FilterBoth);
let mut kademlia = kad::Behaviour::with_config(
local_peer_id,
kad::store::MemoryStore::new(local_peer_id),
config,
);
kademlia.set_mode(Some(kad::Mode::Server));
Behaviour {
kademlia,
local_peer_id,
pending_peers: HashMap::new(),
pending_events: VecDeque::new(),
registration_queries: HashMap::new(),
lookup_queries: HashMap::new(),
}
}
pub fn register(
&mut self,
name: impl Into<Arc<str>>,
registration: ActorRegistration<'static>,
) -> Result<kad::QueryId, kad::store::Error> {
self.register_with_reply(name.into(), registration, None)
.map_err(|(_, err)| err)
}
pub fn cancel_registration(&mut self, query_id: &kad::QueryId) -> bool {
self.registration_queries.remove(query_id).is_some()
}
pub fn unregister(&mut self, name: &str) {
self.kademlia
.stop_providing(&kad::RecordKey::new(&name.as_bytes()));
let key = format!("{}:meta:{}", name, self.local_peer_id);
self.kademlia
.remove_record(&kad::RecordKey::from(key.into_bytes()));
}
pub fn lookup(&mut self, name: impl Into<Arc<str>>) -> kad::QueryId {
self.lookup_with_reply(name.into(), None)
}
pub fn lookup_local(
&mut self,
name: &str,
) -> Result<Option<ActorRegistration<'static>>, RegistryError> {
let key = kad::RecordKey::new(&name);
let store_mut = self.kademlia.store_mut();
let is_providing = store_mut.provided().any(|k| k.key == key);
if is_providing {
let metadata_key = format!("{name}:meta:{}", self.local_peer_id);
store_mut
.get(&kad::RecordKey::new(&metadata_key))
.map(|record| {
ActorRegistration::from_bytes(&record.value)
.map(ActorRegistration::into_owned)
.map_err(RegistryError::from)
})
.transpose()
} else {
Ok(None)
}
}
pub fn cancel_lookup(&mut self, query_id: &kad::QueryId) -> bool {
self.lookup_queries.remove(query_id).is_some()
}
pub(super) fn register_with_reply(
&mut self,
name: Arc<str>,
registration: ActorRegistration<'static>,
reply: Option<RegisterReply>,
) -> Result<kad::QueryId, (Option<RegisterReply>, kad::store::Error)> {
let key = kad::RecordKey::new(&name.as_bytes());
let provider_query_id = match self.kademlia.start_providing(key) {
Ok(id) => id,
Err(err) => {
return Err((reply, err));
}
};
self.registration_queries.insert(
provider_query_id,
RegistrationQuery {
name,
registration: Some(registration),
put_query_id: None,
provider_result: None,
reply,
},
);
Ok(provider_query_id)
}
pub(super) fn lookup_with_reply(
&mut self,
name: Arc<str>,
reply: Option<LookupReply>,
) -> kad::QueryId {
let query_id = self
.kademlia
.get_providers(kad::RecordKey::new(&name.as_bytes()));
self.lookup_queries.insert(
query_id,
LookupQuery {
name,
providers_finished: false,
metadata_queries: HashSet::new(),
queried_providers: HashSet::new(),
reported_providers: HashSet::new(),
reply,
},
);
query_id
}
fn handle_kademlia_event(&mut self, ev: kad::Event) -> (bool, Option<Event>) {
match ev {
kad::Event::InboundRequest { request } => {
match request {
kad::InboundRequest::AddProvider { record } => {
let record =
record.expect("filtering is enabled, so the record should be present");
if self.validate_provider_registration(&record) {
#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
if let Err(err) = self.kademlia.store_mut().add_provider(record) {
#[cfg(feature = "tracing")]
tracing::warn!("failed to store provider: {err}");
}
}
(false, None)
}
kad::InboundRequest::PutRecord { source, record, .. } => {
let record =
record.expect("filtering is enabled, so the record should be present");
if self.validate_metadata_record(&source, &record) {
#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
if let Err(err) = self.kademlia.store_mut().put(record) {
#[cfg(feature = "tracing")]
tracing::warn!("failed to store metadata record: {err}");
}
}
(false, None)
}
_ => (false, None),
}
}
kad::Event::OutboundQueryProgressed {
id,
result,
stats: _,
step,
} => match result {
#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
kad::QueryResult::Bootstrap(res) => {
#[cfg(feature = "tracing")]
if let Err(err) = res {
tracing::warn!("bootstrap failed: {err}");
}
let failed_peers = self
.pending_peers
.extract_if(|_, failed_at| failed_at.elapsed() > Duration::from_secs(5));
for ((peer_id, addr), _) in failed_peers {
#[cfg(feature = "tracing")]
tracing::debug!(%peer_id, %addr, "removing address for peer");
self.kademlia.remove_address(&peer_id, &addr);
}
(false, None)
}
kad::QueryResult::GetClosestPeers(_) => (false, None),
kad::QueryResult::GetProviders(res) => {
let Entry::Occupied(mut lookup_query_entry) = self.lookup_queries.entry(id)
else {
#[cfg(feature = "tracing")]
tracing::warn!("ignoring GetProviders event for unknown lookup query");
return (false, None);
};
let lookup_query = lookup_query_entry.get_mut();
match res {
Ok(kad::GetProvidersOk::FoundProviders { providers, .. }) => {
let mut wake = false;
lookup_query.providers_finished = step.last;
for provider in providers {
wake |=
lookup_query.get_metadata_record(&mut self.kademlia, &provider);
}
let last = step.last && lookup_query.is_finished();
if last {
if lookup_query.reply.is_none() {
self.pending_events.push_back(Event::LookupCompleted {
provider_query_id: id,
});
}
lookup_query_entry.remove();
}
(wake, None)
}
Ok(kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. }) => {
lookup_query.providers_finished = step.last;
let last = step.last && lookup_query.is_finished();
if last {
if lookup_query.reply.is_none() {
self.pending_events.push_back(Event::LookupCompleted {
provider_query_id: id,
});
}
lookup_query_entry.remove();
}
(false, None)
}
Err(kad::GetProvidersError::Timeout { .. }) => {
lookup_query.providers_finished = step.last;
let last = step.last && lookup_query.is_finished();
match &lookup_query.reply {
Some(tx) => {
let _ = tx.send(Err(RegistryError::Timeout));
}
None => {
self.pending_events.push_back(Event::LookupTimeout {
provider_query_id: id,
});
}
}
if last {
if lookup_query.reply.is_none() {
self.pending_events.push_back(Event::LookupCompleted {
provider_query_id: id,
});
}
lookup_query_entry.remove();
}
(false, None)
}
}
}
kad::QueryResult::StartProviding(res) => {
let Entry::Occupied(mut registration_query_entry) =
self.registration_queries.entry(id)
else {
#[cfg(feature = "tracing")]
tracing::warn!(
"ignoring StartProviding event for unknown registration query"
);
return (false, None);
};
let registration_query = registration_query_entry.get_mut();
match res {
Ok(kad::AddProviderOk { .. }) => {
let Some(registration) = registration_query.registration.take() else {
panic!("the registration should exist here");
};
registration_query.provider_result = Some(res.clone());
let key =
format!("{}:meta:{}", registration_query.name, self.local_peer_id);
let registration_bytes = registration.into_bytes();
let record = kad::Record::new(key.into_bytes(), registration_bytes);
match self.kademlia.put_record(record, kad::Quorum::One) {
Ok(put_query_id) => {
registration_query.put_query_id = Some(put_query_id);
}
Err(err) => {
match registration_query_entry.remove().reply {
Some(tx) => {
let _ = tx.send(Err(err.into()));
}
None => {
self.pending_events.push_back(
Event::RegistrationFailed {
provider_query_id: id,
error: err.into(),
},
);
}
}
}
}
(true, None)
}
Err(kad::AddProviderError::Timeout { .. }) => {
match registration_query_entry.remove().reply {
Some(tx) => {
let _ = tx.send(Err(RegistryError::Timeout));
}
None => {
self.pending_events.push_back(Event::RegistrationFailed {
provider_query_id: id,
error: RegistryError::Timeout,
});
}
}
(false, None)
}
}
}
kad::QueryResult::RepublishProvider(_) => (false, None),
kad::QueryResult::GetRecord(res) => {
let Some((provider_query_id, lookup_query)) = self
.lookup_queries
.iter_mut()
.find(|(_, lookup_query)| lookup_query.has_metadata_query(&id))
else {
#[cfg(feature = "tracing")]
tracing::warn!("ignoring GetRecord event for unknown lookup query");
return (false, None);
};
let provider_query_id = *provider_query_id;
match res {
Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {
record: kad::Record { value, .. },
..
})) => {
let result = ActorRegistration::from_bytes(&value)
.map(|registration| registration.into_owned())
.map_err(RegistryError::from);
let should_emit = if let Ok(ref registration) = result {
if let Some(peer_id) = registration.actor_id.peer_id() {
if lookup_query.reported_providers.contains(peer_id) {
false } else {
lookup_query.reported_providers.insert(*peer_id);
true
}
} else {
true }
} else {
true };
if should_emit {
match &lookup_query.reply {
Some(tx) => {
let _ = tx.send(result);
}
None => {
self.pending_events.push_back(Event::LookupProgressed {
provider_query_id,
get_query_id: id,
result,
});
}
}
}
}
Ok(kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. })
| Err(kad::GetRecordError::NotFound { .. }) => {
}
Err(kad::GetRecordError::QuorumFailed { quorum, .. }) => {
match &lookup_query.reply {
Some(tx) => {
let _ = tx.send(Err(RegistryError::QuorumFailed { quorum }));
}
None => {
self.pending_events.push_back(Event::LookupProgressed {
provider_query_id,
get_query_id: id,
result: Err(RegistryError::QuorumFailed { quorum }),
});
}
}
}
Err(kad::GetRecordError::Timeout { .. }) => match &lookup_query.reply {
Some(tx) => {
let _ = tx.send(Err(RegistryError::Timeout));
}
None => {
self.pending_events.push_back(Event::LookupProgressed {
provider_query_id,
get_query_id: id,
result: Err(RegistryError::Timeout),
});
}
},
}
if step.last {
lookup_query.metadata_query_finished(&id);
let last = lookup_query.is_finished();
if last {
if lookup_query.reply.is_none() {
self.pending_events
.push_back(Event::LookupCompleted { provider_query_id });
}
self.lookup_queries.remove(&provider_query_id);
}
}
(false, None)
}
kad::QueryResult::PutRecord(res) => {
let Some(provider_query_id) =
self.registration_queries
.iter()
.find_map(|(query_id, reg)| {
if reg.put_query_id == Some(id) {
Some(*query_id)
} else {
None
}
})
else {
#[cfg(feature = "tracing")]
tracing::warn!("ignoring PutRecord event for unknown registration query");
return (false, None);
};
match res {
Ok(ok) => {
let mut registration_query = self
.registration_queries
.remove(&provider_query_id)
.unwrap();
match registration_query.reply.take() {
Some(tx) => {
let _ = tx.send(Ok(()));
}
None => {
self.pending_events.push_back(Event::RegisteredActor {
provider_result: registration_query
.provider_result
.unwrap(),
provider_query_id,
metadata_result: Ok(ok.clone()),
metadata_query_id: id,
});
}
}
}
Err(err) => {
match self
.registration_queries
.remove(&provider_query_id)
.and_then(|q| q.reply)
{
Some(tx) => {
let _ = tx.send(Err(err.clone().into()));
}
None => {
self.pending_events.push_back(Event::RegistrationFailed {
provider_query_id,
error: err.clone().into(),
});
}
}
}
}
(false, None)
}
kad::QueryResult::RepublishRecord(_) => (false, None),
},
kad::Event::RoutingUpdated {
peer,
is_new_peer,
addresses,
bucket_range,
old_peer,
} => (
false,
Some(Event::RoutingUpdated {
peer,
is_new_peer,
addresses,
bucket_range,
old_peer,
}),
),
kad::Event::UnroutablePeer { peer } => (false, Some(Event::UnroutablePeer { peer })),
kad::Event::RoutablePeer { peer, address } => {
(false, Some(Event::RoutablePeer { peer, address }))
}
kad::Event::PendingRoutablePeer { peer, address } => {
(false, Some(Event::PendingRoutablePeer { peer, address }))
}
kad::Event::ModeChanged { .. } => (false, None),
}
}
fn validate_provider_registration(&mut self, provider: &kad::ProviderRecord) -> bool {
#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
let source = &provider.provider;
let key_str = match std::str::from_utf8(provider.key.as_ref()) {
Ok(s) => s,
Err(_) => {
#[cfg(feature = "tracing")]
tracing::warn!("invalid UTF-8 in provider key from {source}");
return false;
}
};
if !self.is_valid_actor_name(key_str) {
#[cfg(feature = "tracing")]
tracing::warn!("invalid actor name in provider registration from {source}: {key_str}");
return false;
}
if !self.is_valid_provider_record(provider) {
#[cfg(feature = "tracing")]
tracing::warn!("invalid provider record from {source}");
return false;
}
#[cfg(feature = "tracing")]
tracing::debug!("validated provider registration for {key_str} from {source}");
true
}
fn is_valid_provider_record(&self, provider: &kad::ProviderRecord) -> bool {
if let Some(expires) = provider.expires {
if expires < Instant::now() {
#[cfg(feature = "tracing")]
tracing::warn!("provider record is already expired");
return false;
}
}
true
}
fn validate_metadata_record(&mut self, source: &PeerId, record: &kad::Record) -> bool {
let key_str = match str::from_utf8(record.key.as_ref()) {
Ok(s) => s,
Err(_) => {
#[cfg(feature = "tracing")]
tracing::warn!("invalid UTF-8 in metadata record key from {source}");
return false;
}
};
let parts: Vec<_> = key_str.splitn(2, ":meta:").collect();
if parts.len() != 2 {
#[cfg(feature = "tracing")]
tracing::warn!("invalid metadata key format from {source}: {key_str}");
return false;
}
let actor_name = parts[0];
let peer_id_str = parts[1];
if !self.is_valid_actor_name(actor_name) {
#[cfg(feature = "tracing")]
tracing::warn!("invalid actor name from {source}: {actor_name}");
return false;
}
let record_peer_id = match PeerId::from_str(peer_id_str) {
Ok(id) => id,
Err(_) => {
#[cfg(feature = "tracing")]
tracing::warn!("invalid peer ID in metadata key from {source}: {peer_id_str}");
return false;
}
};
if record_peer_id != *source {
#[cfg(feature = "tracing")]
tracing::warn!(
"peer ID mismatch: source {source} trying to register metadata for peer {record_peer_id}"
);
return false;
}
if !self.is_valid_metadata(&record.value) {
#[cfg(feature = "tracing")]
tracing::warn!("invalid metadata format from {source}");
return false;
}
#[cfg(feature = "tracing")]
tracing::debug!("validated metadata record for {actor_name} from {source}");
true
}
fn is_valid_actor_name(&self, name: &str) -> bool {
!name.is_empty()
}
fn is_valid_metadata(&self, data: &[u8]) -> bool {
if data.len() > 64 * 1024 {
return false;
}
ActorRegistration::from_bytes(data).is_ok()
}
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = THandler<kad::Behaviour<kad::store::MemoryStore>>;
type ToSwarm = Event;
fn handle_established_inbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
local_addr: &libp2p::Multiaddr,
remote_addr: &libp2p::Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.kademlia.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &libp2p::Multiaddr,
role_override: libp2p::core::Endpoint,
port_use: libp2p::core::transport::PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
self.kademlia.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
)
}
fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
failed_addresses,
..
}) => {
self.pending_peers.retain(|(pending_peer_id, addr), _| {
pending_peer_id != &peer_id
|| failed_addresses.iter().any(|failed_addr| failed_addr == addr)
});
}
FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { peer_id, addr }) => {
self.kademlia.add_address(&peer_id, addr.clone());
}
FromSwarm::DialFailure(DialFailure {
peer_id: Some(peer_id),
error: DialError::Transport(errors),
..
}) => {
let now = Instant::now();
for (addr, _) in errors {
self.pending_peers
.entry((peer_id, addr.clone()))
.or_insert(now);
}
}
_ => {}
}
self.kademlia.on_swarm_event(event)
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.kademlia
.on_connection_handler_event(peer_id, connection_id, event)
}
fn poll(
&mut self,
cx: &mut task::Context<'_>,
) -> task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
loop {
if let Some(ev) = self.pending_events.pop_front() {
if !self.pending_events.is_empty() {
cx.waker().wake_by_ref();
}
return task::Poll::Ready(ToSwarm::GenerateEvent(ev));
}
match self.kademlia.poll(cx) {
task::Poll::Ready(ToSwarm::GenerateEvent(ev)) => {
let (wake, ev) = self.handle_kademlia_event(ev);
if let Some(ev) = ev {
if wake || !self.pending_events.is_empty() {
cx.waker().wake_by_ref();
}
return task::Poll::Ready(ToSwarm::GenerateEvent(ev));
}
if wake || !self.pending_events.is_empty() {
continue;
}
continue;
}
task::Poll::Ready(other_ev) => {
if !self.pending_events.is_empty() {
cx.waker().wake_by_ref();
}
return task::Poll::Ready(
other_ev.map_out(|_| unreachable!("we handled GenerateEvent above")),
);
}
task::Poll::Pending => {
if !self.pending_events.is_empty() {
continue; }
return task::Poll::Pending;
}
}
}
}
}
struct RegistrationQuery {
name: Arc<str>,
registration: Option<ActorRegistration<'static>>,
put_query_id: Option<kad::QueryId>,
provider_result: Option<kad::AddProviderResult>,
reply: Option<RegisterReply>,
}
struct LookupQuery {
name: Arc<str>,
providers_finished: bool,
metadata_queries: HashSet<kad::QueryId>,
queried_providers: HashSet<PeerId>,
reported_providers: HashSet<PeerId>,
reply: Option<LookupReply>,
}
impl LookupQuery {
fn get_metadata_record(
&mut self,
kademlia: &mut kad::Behaviour<kad::store::MemoryStore>,
provider: &PeerId,
) -> bool {
if self.queried_providers.contains(provider) {
return false;
}
self.queried_providers.insert(*provider);
let key = format!("{}:meta:{provider}", self.name);
let query_id = kademlia.get_record(key.into_bytes().into());
self.metadata_queries.insert(query_id);
true
}
fn has_metadata_query(&self, query_id: &kad::QueryId) -> bool {
self.metadata_queries.contains(query_id)
}
fn metadata_query_finished(&mut self, query_id: &kad::QueryId) {
self.metadata_queries.remove(query_id);
}
fn is_finished(&self) -> bool {
self.providers_finished && self.metadata_queries.is_empty()
}
}
#[derive(Debug)]
pub struct ActorRegistration<'a> {
pub actor_id: ActorId,
pub remote_id: Cow<'a, str>,
}
impl<'a> ActorRegistration<'a> {
pub fn new(actor_id: ActorId, remote_id: Cow<'a, str>) -> Self {
ActorRegistration {
actor_id,
remote_id,
}
}
pub fn into_bytes(self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(1 + 8 + 42 + self.remote_id.len());
let actor_id_bytes = self.actor_id.to_bytes();
let peer_id_len = (actor_id_bytes.len() - 8) as u8;
bytes.extend_from_slice(&peer_id_len.to_le_bytes());
bytes.extend_from_slice(&actor_id_bytes);
bytes.extend_from_slice(self.remote_id.as_bytes());
bytes
}
pub fn from_bytes(bytes: &'a [u8]) -> Result<Self, InvalidActorRegistration> {
if bytes.is_empty() {
return Err(InvalidActorRegistration::EmptyActorRegistration);
}
let peer_id_bytes_len = u8::from_le_bytes(bytes[..1].try_into().unwrap()) as usize;
let actor_id = ActorId::from_bytes(&bytes[1..1 + 8 + peer_id_bytes_len])?;
let remote_id = std::str::from_utf8(&bytes[1 + 8 + peer_id_bytes_len..])
.map_err(InvalidActorRegistration::InvalidRemoteIDUtf8)?;
Ok(ActorRegistration::new(actor_id, Cow::Borrowed(remote_id)))
}
pub fn into_owned(self) -> ActorRegistration<'static> {
ActorRegistration::new(self.actor_id, Cow::Owned(self.remote_id.into_owned()))
}
}
#[derive(Debug)]
#[cfg_attr(not(feature = "remote"), derive(Clone))]
pub enum InvalidActorRegistration {
EmptyActorRegistration,
ActorId(ActorIdFromBytesError),
InvalidRemoteIDUtf8(str::Utf8Error),
}
impl From<ActorIdFromBytesError> for InvalidActorRegistration {
fn from(err: ActorIdFromBytesError) -> Self {
InvalidActorRegistration::ActorId(err)
}
}
impl fmt::Display for InvalidActorRegistration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InvalidActorRegistration::EmptyActorRegistration => {
write!(f, "empty actor registration")
}
InvalidActorRegistration::ActorId(err) => err.fmt(f),
InvalidActorRegistration::InvalidRemoteIDUtf8(err) => err.fmt(f),
}
}
}