Skip to main content

rpc_runtime_transport/
lib.rs

1use std::future::Future;
2use std::io;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::Arc;
6
7use rpc_runtime_codec_msgpack::CodecError;
8use rpc_runtime_core::Envelope;
9use rpc_runtime_errors::{RuntimeError, RuntimeErrorCode};
10use thiserror::Error;
11
12pub type TransportFuture<'a, T> =
13    Pin<Box<dyn Future<Output = Result<T, TransportError>> + Send + 'a>>;
14
15#[derive(Debug, Error)]
16pub enum TransportError {
17    #[error("transport I/O error: {0}")]
18    Io(#[from] io::Error),
19    #[error("transport protocol error: {0}")]
20    Runtime(RuntimeError),
21}
22
23impl TransportError {
24    pub fn runtime(code: RuntimeErrorCode, message: impl Into<String>) -> Self {
25        Self::Runtime(RuntimeError::protocol(code, message))
26    }
27}
28
29impl From<CodecError> for TransportError {
30    fn from(value: CodecError) -> Self {
31        Self::Runtime(value.into_runtime_error())
32    }
33}
34
35pub trait EnvelopeWriter: Send + Sync {
36    fn send_envelope<'a>(&'a self, envelope: &'a Envelope) -> TransportFuture<'a, ()>;
37
38    fn shutdown<'a>(&'a self) -> TransportFuture<'a, ()>;
39}
40
41pub trait EnvelopeReader: Send {
42    fn recv_envelope<'a>(&'a mut self) -> TransportFuture<'a, Option<Envelope>>;
43}
44
45pub trait RpcListener: Send {
46    fn accept<'a>(&'a mut self) -> TransportFuture<'a, RpcConnection>;
47
48    fn set_connection_scope(&mut self, _: ConnectionScope) {}
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum ConnectionScope {
53    LocalOnly,
54    RemoteAllowed,
55}
56
57impl Default for ConnectionScope {
58    fn default() -> Self {
59        Self::LocalOnly
60    }
61}
62
63pub fn is_local_socket_addr(addr: &SocketAddr) -> bool {
64    addr.ip().is_loopback()
65}
66
67#[derive(Clone)]
68pub struct RpcSender {
69    inner: Arc<dyn EnvelopeWriter>,
70}
71
72impl RpcSender {
73    pub fn new(inner: Arc<dyn EnvelopeWriter>) -> Self {
74        Self { inner }
75    }
76
77    pub async fn send_envelope(&self, envelope: &Envelope) -> Result<(), TransportError> {
78        self.inner.send_envelope(envelope).await
79    }
80
81    pub async fn shutdown(&self) -> Result<(), TransportError> {
82        self.inner.shutdown().await
83    }
84}
85
86pub struct RpcReceiver {
87    inner: Box<dyn EnvelopeReader>,
88}
89
90impl RpcReceiver {
91    pub fn new(inner: Box<dyn EnvelopeReader>) -> Self {
92        Self { inner }
93    }
94
95    pub async fn recv_envelope(&mut self) -> Result<Option<Envelope>, TransportError> {
96        self.inner.recv_envelope().await
97    }
98}
99
100pub struct RpcConnection {
101    sender: RpcSender,
102    receiver: RpcReceiver,
103}
104
105impl RpcConnection {
106    pub fn new(sender: RpcSender, receiver: RpcReceiver) -> Self {
107        Self { sender, receiver }
108    }
109
110    pub fn split(self) -> (RpcSender, RpcReceiver) {
111        (self.sender, self.receiver)
112    }
113}