piying 0.1.1

Fault-tolerant Async Actors Built on Tokio
Documentation
/// Events emitted by the remote behaviour.
///
/// These events are generated by the underlying messaging and registry behaviours
/// and provide information about remote actor operations, network changes, and
/// communication status.
///
/// # Example
///
/// ```no_run
/// use piying::remote;
/// use libp2p::swarm::SwarmEvent;
/// #
/// # let swarm_event: SwarmEvent<remote::Event> = todo!();
///
/// // In your swarm event loop:
/// match swarm_event {
///     SwarmEvent::Behaviour(remote::Event::Registry(registry_event)) => {
///         // Handle registry events (actor registration, lookup, etc.)
///     }
///     SwarmEvent::Behaviour(remote::Event::Messaging(messaging_event)) => {
///         // Handle messaging events (message delivery, timeouts, etc.)
///     }
///     _ => {}
/// }
/// ```
#[derive(Debug)]
pub enum Event {
    /// An event from the messaging subsystem.
    ///
    /// These events relate to sending and receiving messages between remote actors,
    /// including delivery confirmations, timeouts, and connection status updates.
    Messaging(messaging::Event),

    /// An event from the registry subsystem.
    ///
    /// These events relate to actor registration and discovery operations,
    /// including successful registrations, lookup results, and network topology changes.
    Registry(registry::Event),
}

impl<C> NetworkBehaviour for Behaviour<C>
where
    C: libp2p::request_response::Codec<
            Protocol = libp2p::StreamProtocol,
            Request = messaging::SwarmRequest,
            Response = messaging::SwarmResponse,
        > + Clone
        + Send
        + 'static,
{
    type ConnectionHandler =
        ConnectionHandlerSelect<THandler<messaging::Behaviour<C>>, THandler<registry::Behaviour>>;
    type ToSwarm = Event;

    fn handle_pending_inbound_connection(
        &mut self,
        connection_id: ConnectionId,
        local_addr: &Multiaddr,
        remote_addr: &Multiaddr,
    ) -> Result<(), ConnectionDenied> {
        self.messaging
            .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;

        self.registry
            .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;

        Ok(())
    }

    fn handle_established_inbound_connection(
        &mut self,
        connection_id: ConnectionId,
        peer: PeerId,
        local_addr: &Multiaddr,
        remote_addr: &Multiaddr,
    ) -> Result<THandler<Self>, ConnectionDenied> {
        Ok(ConnectionHandler::select(
            self.messaging.handle_established_inbound_connection(
                connection_id,
                peer,
                local_addr,
                remote_addr,
            )?,
            self.registry.handle_established_inbound_connection(
                connection_id,
                peer,
                local_addr,
                remote_addr,
            )?,
        ))
    }

    fn handle_pending_outbound_connection(
        &mut self,
        connection_id: ConnectionId,
        maybe_peer: Option<PeerId>,
        addresses: &[Multiaddr],
        effective_role: Endpoint,
    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
        let mut combined_addresses = Vec::new();

        combined_addresses.extend(self.messaging.handle_pending_outbound_connection(
            connection_id,
            maybe_peer,
            addresses,
            effective_role,
        )?);

        combined_addresses.extend(self.registry.handle_pending_outbound_connection(
            connection_id,
            maybe_peer,
            addresses,
            effective_role,
        )?);

        Ok(combined_addresses)
    }

    fn handle_established_outbound_connection(
        &mut self,
        connection_id: ConnectionId,
        peer: PeerId,
        addr: &Multiaddr,
        role_override: Endpoint,
        port_use: PortUse,
    ) -> Result<THandler<Self>, ConnectionDenied> {
        Ok(ConnectionHandler::select(
            self.messaging.handle_established_outbound_connection(
                connection_id,
                peer,
                addr,
                role_override,
                port_use,
            )?,
            self.registry.handle_established_outbound_connection(
                connection_id,
                peer,
                addr,
                role_override,
                port_use,
            )?,
        ))
    }

    fn on_connection_handler_event(
        &mut self,
        peer_id: PeerId,
        connection_id: ConnectionId,
        event: THandlerOutEvent<Self>,
    ) {
        match event {
            Either::Left(ev) => {
                self.messaging
                    .on_connection_handler_event(peer_id, connection_id, ev)
            }
            Either::Right(ev) => {
                self.registry
                    .on_connection_handler_event(peer_id, connection_id, ev)
            }
        }
    }

    fn poll(
        &mut self,
        cx: &mut task::Context<'_>,
    ) -> task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
        match self.cmd_rx.poll_recv(cx) {
            task::Poll::Ready(Some(cmd)) => {
                if self.handle_command(cmd) {
                    cx.waker().wake_by_ref();
                }
            }
            task::Poll::Ready(None) => {}
            task::Poll::Pending => {}
        }

        match self.messaging.poll(cx) {
            task::Poll::Ready(ev) => {
                return task::Poll::Ready(ev.map_in(Either::Left).map_out(Event::Messaging));
            }
            task::Poll::Pending => {}
        }

        match self.registry.poll(cx) {
            task::Poll::Ready(ev) => {
                return task::Poll::Ready(ev.map_in(Either::Right).map_out(Event::Registry));
            }
            task::Poll::Pending => {}
        }

        task::Poll::Pending
    }

    fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
        if let FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) = &event {
            let peer_id = *peer_id;
            tokio::spawn(async move {
                let mut futures = FuturesUnordered::new();
                for RemoteRegistryActorRef {
                    signal_mailbox,
                    links,
                    ..
                } in REMOTE_REGISTRY.lock().await.values()
                {
                    for linked_actor_id in (*links.lock().await).keys() {
                        if linked_actor_id.peer_id() == Some(peer_id) {
                            let signal_mailbox = signal_mailbox.clone();
                            let linked_actor_id = *linked_actor_id;
                            futures.push(async move {
                                signal_mailbox
                                    .signal_link_died(
                                        linked_actor_id,
                                        ActorStopReason::PeerDisconnected,
                                    )
                                    .await
                            });
                        }
                    }
                }

                while (futures.next().await).is_some() {}
            });
        }

        self.messaging.on_swarm_event(event);
        self.registry.on_swarm_event(event);
    }
}