piying 0.1.1

Fault-tolerant Async Actors Built on Tokio
Documentation
use std::error;
use std::hash::Hash;
#[cfg(feature = "remote")]
use std::hash::Hasher;
use std::sync::atomic::Ordering;
use std::{fmt, sync::atomic::AtomicUsize};

#[cfg(feature = "remote")]
use crate::remote::ActorSwarm;

static ACTOR_COUNTER: AtomicUsize = AtomicUsize::new(0);

/// A globally unique identifier for an actor within a distributed system.
///
/// `ActorId` combines a locally sequential `sequence_id` with an optional `peer_id`
/// to uniquely identify actors across a distributed network.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ActorId {
    #[cfg(feature = "remote")]
    peer_id: PeerIdKind,
    sequence_id: u64,
}

impl ActorId {
    /// Creates a new `ActorId` with the given `sequence_id`, using the local actor swarm.
    ///
    /// If the local actor swarm hasn't been bootstrapped, no `peer_id` will be associated,
    /// but the actor is still considered to be running locally.
    ///
    /// # Arguments
    ///
    /// * `sequence_id` - The sequential identifier for the actor.
    ///
    /// # Returns
    ///
    /// A new `ActorId` instance.
    pub fn new(sequence_id: u64) -> Self {
        ActorId {
            sequence_id,
            #[cfg(feature = "remote")]
            peer_id: PeerIdKind::Local,
        }
    }

    /// Creates a new `ActorId` with a specific `sequence_id` and `peer_id`.
    ///
    /// # Arguments
    ///
    /// * `sequence_id` - The sequential identifier for the actor.
    /// * `peer_id` - The `PeerId` associated with this actor.
    ///
    /// # Returns
    ///
    /// A new `ActorId` instance.
    #[cfg(feature = "remote")]
    pub fn new_with_peer_id(sequence_id: u64, peer_id: libp2p::PeerId) -> Self {
        ActorId {
            sequence_id,
            peer_id: PeerIdKind::PeerId(peer_id),
        }
    }

    /// Generates a new `ActorId` with an automatically incremented `sequence_id`.
    ///
    /// Uses an atomic counter to ensure unique `sequence_id` values across threads.
    ///
    /// # Returns
    ///
    /// A new `ActorId` instance with the next available `sequence_id`.
    pub fn generate() -> Self {
        ActorId::new(
            ACTOR_COUNTER
                .fetch_add(1, Ordering::Relaxed)
                .try_into()
                .unwrap(),
        )
    }

    /// Returns the sequential identifier of the actor.
    ///
    /// This `sequence_id` is a unique, locally-generated `u64` assigned to each actor
    /// in the order they are spawned. The first spawned actor gets id 0, the second 1, and so on.
    ///
    /// # Returns
    ///
    /// A `u64` representing the actor's `sequence_id`.
    pub fn sequence_id(&self) -> u64 {
        self.sequence_id
    }

    /// Returns the `PeerId` associated with the `ActorId`, if any.
    ///
    /// # Returns
    ///
    /// An `Option<PeerId>`. `None` is returned if the peer ID is local and no actor swarm has been bootstrapped.
    #[cfg(feature = "remote")]
    pub fn peer_id(&self) -> Option<libp2p::PeerId> {
        self.peer_id.peer_id()
    }

    /// Serializes the `ActorId` into a byte vector.
    ///
    /// The resulting vector contains the `sequence_id` followed by the `peer_id` (if present).
    ///
    /// # Returns
    ///
    /// A `Vec<u8>` containing the serialized `ActorId`.
    pub fn to_bytes(&self) -> Vec<u8> {
        let mut bytes = Vec::with_capacity(8 + 42);
        bytes.extend(&self.sequence_id.to_le_bytes());

        #[cfg(feature = "remote")]
        {
            let peer_id_bytes = self
                .peer_id()
                .map(|peer_id| peer_id.to_bytes())
                .or_else(|| ActorSwarm::with(|s| s.local_peer_id().to_bytes()));

            if let Some(peer_id_bytes) = peer_id_bytes {
                bytes.extend(peer_id_bytes);
            }
        }

        bytes
    }

    /// Deserializes an `ActorId` from a byte slice.
    ///
    /// # Arguments
    ///
    /// * `bytes` - A byte slice containing a serialized `ActorId`.
    ///
    /// # Returns
    ///
    /// A `Result` containing either the deserialized `ActorId` or an `ActorIdFromBytesError`.
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, ActorIdFromBytesError> {
        // Extract the ID
        let sequence_id = u64::from_le_bytes(
            bytes[0..8]
                .try_into()
                .map_err(|_| ActorIdFromBytesError::MissingSequenceID)?,
        );

        // Extract the peer id
        #[cfg(feature = "remote")]
        let peer_id = if bytes.len() > 8 {
            PeerIdKind::PeerId(libp2p::PeerId::from_bytes(&bytes[8..])?)
        } else {
            PeerIdKind::Local
        };

        Ok(ActorId {
            sequence_id,
            #[cfg(feature = "remote")]
            peer_id,
        })
    }
}

impl fmt::Display for ActorId {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        #[cfg(not(feature = "remote"))]
        return write!(f, "ActorId({})", self.sequence_id);

        #[cfg(feature = "remote")]
        match self.peer_id.peer_id() {
            Some(peer_id) => write!(f, "ActorId({}, {peer_id})", self.sequence_id),
            None => write!(f, "ActorId({}, local)", self.sequence_id),
        }
    }
}

impl fmt::Debug for ActorId {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        #[cfg(feature = "remote")]
        return write!(
            f,
            "ActorId({:?}, {:?})",
            self.sequence_id,
            self.peer_id.peer_id()
        );

        #[cfg(not(feature = "remote"))]
        return write!(f, "ActorId({:?})", self.sequence_id);
    }
}

#[cfg(feature = "serde")]
impl serde::Serialize for ActorId {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        serializer.serialize_bytes(&self.to_bytes())
    }
}

#[cfg(feature = "serde")]
impl<'de> serde::Deserialize<'de> for ActorId {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        struct ActorIdVisitor;

        impl<'de> serde::de::Visitor<'de> for ActorIdVisitor {
            type Value = ActorId;

            fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
                formatter.write_str("bytes representing an ActorId")
            }

            fn visit_bytes<E>(self, bytes: &[u8]) -> Result<Self::Value, E>
            where
                E: serde::de::Error,
            {
                let bytes_len = bytes.len();
                ActorId::from_bytes(bytes).map_err(|err| match err {
                    ActorIdFromBytesError::MissingSequenceID => {
                        E::invalid_length(bytes_len, &"sequence ID")
                    }
                    #[cfg(feature = "remote")]
                    err @ ActorIdFromBytesError::ParsePeerID(_) => E::custom(err),
                })
            }

            fn visit_byte_buf<E>(self, bytes: Vec<u8>) -> Result<Self::Value, E>
            where
                E: serde::de::Error,
            {
                self.visit_bytes(&bytes)
            }
        }

        deserializer.deserialize_bytes(ActorIdVisitor)
    }
}

/// Errors that can occur when deserializing an `ActorId` from bytes.
#[derive(Debug)]
pub enum ActorIdFromBytesError {
    /// The byte slice doesn't contain enough data for the `sequence_id`.
    MissingSequenceID,
    /// An error occurred while parsing the `PeerId`.
    #[cfg(feature = "remote")]
    ParsePeerID(libp2p::identity::ParseError),
}

#[cfg(feature = "remote")]
impl From<libp2p::identity::ParseError> for ActorIdFromBytesError {
    fn from(err: libp2p::identity::ParseError) -> Self {
        ActorIdFromBytesError::ParsePeerID(err)
    }
}

impl fmt::Display for ActorIdFromBytesError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ActorIdFromBytesError::MissingSequenceID => write!(f, "missing instance ID"),
            #[cfg(feature = "remote")]
            ActorIdFromBytesError::ParsePeerID(err) => err.fmt(f),
        }
    }
}

impl error::Error for ActorIdFromBytesError {}

#[cfg(feature = "remote")]
#[derive(Clone, Copy)]
enum PeerIdKind {
    Local,
    PeerId(libp2p::PeerId),
}

#[cfg(feature = "remote")]
impl PeerIdKind {
    fn peer_id(&self) -> Option<libp2p::PeerId> {
        match self {
            PeerIdKind::Local => ActorSwarm::with(|s| s.local_peer_id()),
            PeerIdKind::PeerId(peer_id) => Some(*peer_id),
        }
    }
}

#[cfg(feature = "remote")]
impl PartialEq for PeerIdKind {
    fn eq(&self, other: &Self) -> bool {
        self.peer_id() == other.peer_id()
    }
}

#[cfg(feature = "remote")]
impl Eq for PeerIdKind {}

#[cfg(feature = "remote")]
impl PartialOrd for PeerIdKind {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

#[cfg(feature = "remote")]
impl Ord for PeerIdKind {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.peer_id().cmp(&other.peer_id())
    }
}

#[cfg(feature = "remote")]
impl Hash for PeerIdKind {
    fn hash<H: Hasher>(&self, state: &mut H) {
        if let Some(peer_id) = self.peer_id() {
            state.write(&peer_id.to_bytes());
        }
    }
}

include!("id/tests.rs");