use std::error;
use std::hash::Hash;
#[cfg(feature = "remote")]
use std::hash::Hasher;
use std::sync::atomic::Ordering;
use std::{fmt, sync::atomic::AtomicUsize};
use serde::{Deserialize, Serialize};
#[cfg(feature = "remote")]
use crate::remote::ActorSwarm;
static ACTOR_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ActorId {
#[cfg(feature = "remote")]
peer_id: PeerIdKind,
sequence_id: u64,
}
impl ActorId {
pub fn new(sequence_id: u64) -> Self {
ActorId {
sequence_id,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
}
}
#[cfg(feature = "remote")]
pub fn new_with_peer_id(sequence_id: u64, peer_id: libp2p::PeerId) -> Self {
ActorId {
sequence_id,
peer_id: PeerIdKind::PeerId(peer_id),
}
}
pub fn generate() -> Self {
ActorId::new(
ACTOR_COUNTER
.fetch_add(1, Ordering::Relaxed)
.try_into()
.unwrap(),
)
}
pub fn sequence_id(&self) -> u64 {
self.sequence_id
}
#[cfg(feature = "remote")]
pub fn peer_id(&self) -> Option<&libp2p::PeerId> {
self.peer_id.peer_id()
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(8 + 42);
bytes.extend(&self.sequence_id.to_le_bytes());
#[cfg(feature = "remote")]
{
let peer_id_bytes = self
.peer_id()
.map(|peer_id| peer_id.to_bytes())
.or_else(|| ActorSwarm::get().map(|swarm| swarm.local_peer_id().to_bytes()));
if let Some(peer_id_bytes) = peer_id_bytes {
bytes.extend(peer_id_bytes);
}
}
bytes
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, ActorIdFromBytesError> {
let sequence_id = u64::from_le_bytes(
bytes[0..8]
.try_into()
.map_err(|_| ActorIdFromBytesError::MissingSequenceID)?,
);
#[cfg(feature = "remote")]
let peer_id = if bytes.len() > 8 {
PeerIdKind::PeerId(libp2p::PeerId::from_bytes(&bytes[8..])?)
} else {
PeerIdKind::Local
};
Ok(ActorId {
sequence_id,
#[cfg(feature = "remote")]
peer_id,
})
}
}
impl fmt::Display for ActorId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(not(feature = "remote"))]
return write!(f, "#{}", self.sequence_id);
#[cfg(feature = "remote")]
match self.peer_id.peer_id() {
Some(peer_id) => write!(f, "#{}@{peer_id}", self.sequence_id),
None => write!(f, "#{}@local", self.sequence_id),
}
}
}
impl fmt::Debug for ActorId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(feature = "remote")]
return write!(
f,
"ActorId({:?}, {:?})",
self.sequence_id,
self.peer_id.peer_id()
);
#[cfg(not(feature = "remote"))]
return write!(f, "ActorId({:?})", self.sequence_id);
}
}
impl Serialize for ActorId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_bytes(&self.to_bytes())
}
}
impl<'de> Deserialize<'de> for ActorId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ActorIdVisitor;
impl<'de> serde::de::Visitor<'de> for ActorIdVisitor {
type Value = ActorId;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("bytes representing an ActorId")
}
fn visit_bytes<E>(self, bytes: &[u8]) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let bytes_len = bytes.len();
ActorId::from_bytes(bytes).map_err(|err| match err {
ActorIdFromBytesError::MissingSequenceID => {
E::invalid_length(bytes_len, &"sequence ID")
}
#[cfg(feature = "remote")]
err @ ActorIdFromBytesError::ParsePeerID(_) => E::custom(err),
})
}
fn visit_byte_buf<E>(self, bytes: Vec<u8>) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_bytes(&bytes)
}
}
deserializer.deserialize_bytes(ActorIdVisitor)
}
}
#[derive(Debug)]
#[cfg_attr(not(feature = "remote"), derive(Clone))]
pub enum ActorIdFromBytesError {
MissingSequenceID,
#[cfg(feature = "remote")]
ParsePeerID(libp2p_identity::ParseError),
}
#[cfg(feature = "remote")]
impl From<libp2p_identity::ParseError> for ActorIdFromBytesError {
fn from(err: libp2p_identity::ParseError) -> Self {
ActorIdFromBytesError::ParsePeerID(err)
}
}
impl fmt::Display for ActorIdFromBytesError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActorIdFromBytesError::MissingSequenceID => write!(f, "missing instance ID"),
#[cfg(feature = "remote")]
ActorIdFromBytesError::ParsePeerID(err) => err.fmt(f),
}
}
}
impl error::Error for ActorIdFromBytesError {}
#[cfg(feature = "remote")]
#[derive(Clone, Copy)]
enum PeerIdKind {
Local,
PeerId(libp2p::PeerId),
}
#[cfg(feature = "remote")]
impl PeerIdKind {
fn peer_id(&self) -> Option<&libp2p::PeerId> {
match self {
PeerIdKind::Local => ActorSwarm::get().map(ActorSwarm::local_peer_id),
PeerIdKind::PeerId(peer_id) => Some(peer_id),
}
}
}
#[cfg(feature = "remote")]
impl PartialEq for PeerIdKind {
fn eq(&self, other: &Self) -> bool {
self.peer_id() == other.peer_id()
}
}
#[cfg(feature = "remote")]
impl Eq for PeerIdKind {}
#[cfg(feature = "remote")]
impl PartialOrd for PeerIdKind {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[cfg(feature = "remote")]
impl Ord for PeerIdKind {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.peer_id().cmp(&other.peer_id())
}
}
#[cfg(feature = "remote")]
impl Hash for PeerIdKind {
fn hash<H: Hasher>(&self, state: &mut H) {
if let Some(peer_id) = self.peer_id() {
state.write(&peer_id.to_bytes());
}
}
}
#[cfg(test)]
mod tests {
use std::hash::{DefaultHasher, Hasher};
#[cfg(feature = "remote")]
use libp2p::PeerId;
use super::*;
#[cfg(feature = "remote")]
static BARRIER: std::sync::Barrier = std::sync::Barrier::new(2);
#[cfg(feature = "remote")]
fn local_peer_id() -> PeerId {
PeerId::from_bytes(&[
0, 32, 77, 249, 14, 119, 133, 11, 205, 96, 61, 232, 63, 206, 126, 234, 204, 60, 241,
93, 2, 68, 130, 67, 3, 193, 242, 23, 80, 189, 82, 144, 152, 206,
])
.unwrap()
}
#[test]
fn test_actor_id_partial_eq_local() {
let id1 = ActorId {
sequence_id: 0,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 0,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
assert_eq!(id1, id2);
let id1 = ActorId {
sequence_id: 0,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 1,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
assert_ne!(id1, id2);
}
#[test]
#[cfg(feature = "remote")]
fn test_actor_id_partial_eq_remote() {
use tokio::sync::mpsc;
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::Local,
};
assert_eq!(id1, id2);
BARRIER.wait();
let local_peer_id = local_peer_id();
let _ = ActorSwarm::set(mpsc::unbounded_channel().0, local_peer_id);
assert_eq!(id1.peer_id(), Some(&local_peer_id));
assert_eq!(id2.peer_id(), Some(&local_peer_id));
assert_eq!(id1, id2);
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
assert_eq!(id1, id2);
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
assert_eq!(id1, id2);
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(PeerId::random()),
};
assert_ne!(id1, id2);
}
fn hashes_eq(id1: &ActorId, id2: &ActorId) -> bool {
let mut hasher = DefaultHasher::new();
id1.hash(&mut hasher);
let id1_hash = hasher.finish();
let mut hasher = DefaultHasher::new();
id2.hash(&mut hasher);
let id2_hash = hasher.finish();
id1_hash == id2_hash
}
#[test]
fn test_actor_id_hash_local() {
let id1 = ActorId {
sequence_id: 0,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 0,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
assert!(hashes_eq(&id1, &id2));
let id1 = ActorId {
sequence_id: 0,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 1,
#[cfg(feature = "remote")]
peer_id: PeerIdKind::Local,
};
assert!(!hashes_eq(&id1, &id2));
}
#[test]
#[cfg(feature = "remote")]
fn test_actor_id_hash_remote() {
use tokio::sync::mpsc;
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::Local,
};
assert!(hashes_eq(&id1, &id2));
BARRIER.wait();
let local_peer_id = local_peer_id();
let _ = ActorSwarm::set(mpsc::unbounded_channel().0, local_peer_id);
assert_eq!(id1.peer_id(), Some(&local_peer_id));
assert_eq!(id2.peer_id(), Some(&local_peer_id));
assert_eq!(id1, id2);
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::Local,
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
assert!(hashes_eq(&id1, &id2));
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
assert!(hashes_eq(&id1, &id2));
let id1 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(local_peer_id),
};
let id2 = ActorId {
sequence_id: 0,
peer_id: PeerIdKind::PeerId(PeerId::random()),
};
assert!(!hashes_eq(&id1, &id2));
}
}