rpc_runtime_transport/
lib.rs1use std::future::Future;
2use std::io;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::Arc;
6
7use rpc_runtime_codec_msgpack::CodecError;
8use rpc_runtime_core::Envelope;
9use rpc_runtime_errors::{RuntimeError, RuntimeErrorCode};
10use thiserror::Error;
11
12pub type TransportFuture<'a, T> =
13 Pin<Box<dyn Future<Output = Result<T, TransportError>> + Send + 'a>>;
14
15#[derive(Debug, Error)]
16pub enum TransportError {
17 #[error("transport I/O error: {0}")]
18 Io(#[from] io::Error),
19 #[error("transport protocol error: {0}")]
20 Runtime(RuntimeError),
21}
22
23impl TransportError {
24 pub fn runtime(code: RuntimeErrorCode, message: impl Into<String>) -> Self {
25 Self::Runtime(RuntimeError::protocol(code, message))
26 }
27}
28
29impl From<CodecError> for TransportError {
30 fn from(value: CodecError) -> Self {
31 Self::Runtime(value.into_runtime_error())
32 }
33}
34
35pub trait EnvelopeWriter: Send + Sync {
36 fn send_envelope<'a>(&'a self, envelope: &'a Envelope) -> TransportFuture<'a, ()>;
37
38 fn shutdown<'a>(&'a self) -> TransportFuture<'a, ()>;
39}
40
41pub trait EnvelopeReader: Send {
42 fn recv_envelope<'a>(&'a mut self) -> TransportFuture<'a, Option<Envelope>>;
43}
44
45pub trait RpcListener: Send {
46 fn accept<'a>(&'a mut self) -> TransportFuture<'a, RpcConnection>;
47
48 fn set_connection_scope(&mut self, _: ConnectionScope) {}
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum ConnectionScope {
53 LocalOnly,
54 RemoteAllowed,
55}
56
57impl Default for ConnectionScope {
58 fn default() -> Self {
59 Self::LocalOnly
60 }
61}
62
63pub fn is_local_socket_addr(addr: &SocketAddr) -> bool {
64 addr.ip().is_loopback()
65}
66
67#[derive(Clone)]
68pub struct RpcSender {
69 inner: Arc<dyn EnvelopeWriter>,
70}
71
72impl RpcSender {
73 pub fn new(inner: Arc<dyn EnvelopeWriter>) -> Self {
74 Self { inner }
75 }
76
77 pub async fn send_envelope(&self, envelope: &Envelope) -> Result<(), TransportError> {
78 self.inner.send_envelope(envelope).await
79 }
80
81 pub async fn shutdown(&self) -> Result<(), TransportError> {
82 self.inner.shutdown().await
83 }
84}
85
86pub struct RpcReceiver {
87 inner: Box<dyn EnvelopeReader>,
88}
89
90impl RpcReceiver {
91 pub fn new(inner: Box<dyn EnvelopeReader>) -> Self {
92 Self { inner }
93 }
94
95 pub async fn recv_envelope(&mut self) -> Result<Option<Envelope>, TransportError> {
96 self.inner.recv_envelope().await
97 }
98}
99
100pub struct RpcConnection {
101 sender: RpcSender,
102 receiver: RpcReceiver,
103}
104
105impl RpcConnection {
106 pub fn new(sender: RpcSender, receiver: RpcReceiver) -> Self {
107 Self { sender, receiver }
108 }
109
110 pub fn split(self) -> (RpcSender, RpcReceiver) {
111 (self.sender, self.receiver)
112 }
113}