Skip to main content

atomr_remote/transport/
mod.rs

1//! Transport abstraction. akka.net: `Remote/Transport/Transport.cs`.
2//!
3//! A `Transport` is a bidirectional, addressable, frame-oriented channel
4//! between two `ActorSystem`s. The Akka protocol layer
5//! ([`AkkaProtocolTransport`]) sits on top of the raw `Transport` and
6//! handles handshake, heartbeat, ack, and disassociate PDUs.
7
8pub mod akka_protocol;
9mod failure_injector;
10mod tcp;
11mod test_transport;
12mod throttle;
13
14pub use akka_protocol::AkkaProtocolTransport;
15pub use failure_injector::{FailureInjectorTransport, InjectionMode};
16pub use tcp::TcpTransport;
17pub use test_transport::TestTransport;
18pub use throttle::{ThrottleMode, ThrottleTransport};
19
20use async_trait::async_trait;
21use thiserror::Error;
22use tokio::sync::mpsc;
23
24use atomr_core::actor::Address;
25
26use crate::pdu::AkkaPdu;
27
28#[derive(Debug, Error)]
29pub enum TransportError {
30    #[error("io error: {0}")]
31    Io(#[from] std::io::Error),
32    #[error("codec: {0}")]
33    Codec(#[from] crate::codec::CodecError),
34    #[error("not associated with `{0}`")]
35    NotAssociated(String),
36    #[error("transport closed")]
37    Closed,
38    #[error("handshake rejected: {0}")]
39    HandshakeRejected(String),
40    #[error("transport-specific: {0}")]
41    Other(String),
42}
43
44/// A frame received from a remote peer.
45#[derive(Debug)]
46pub struct InboundFrame {
47    pub from: Address,
48    pub pdu: AkkaPdu,
49}
50
51/// Bidirectional, frame-oriented connectivity to other `ActorSystem`s.
52#[async_trait]
53pub trait Transport: Send + Sync {
54    /// Bind a listener and return the local `Address`.
55    async fn listen(&self) -> Result<Address, TransportError>;
56
57    /// Open (or reuse) an outbound association to `target`.
58    async fn associate(&self, target: &Address) -> Result<(), TransportError>;
59
60    /// Send a single PDU to the peer at `target`. Implementations are
61    /// expected to associate lazily if needed.
62    async fn send(&self, target: &Address, pdu: AkkaPdu) -> Result<(), TransportError>;
63
64    /// Take ownership of the inbound stream. Calling more than once
65    /// returns an empty channel.
66    fn inbound(&self) -> mpsc::UnboundedReceiver<InboundFrame>;
67
68    /// Drop a specific association (used by quarantine).
69    async fn disassociate(&self, target: &Address) -> Result<(), TransportError>;
70
71    /// Stop listening and close all associations.
72    async fn shutdown(&self) -> Result<(), TransportError>;
73}