1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
use core::future::Future;
use agnostic_lite::RuntimeLite;
use bytes::Bytes;
pub use nodecraft::{CheapClone, resolver::AddressResolver, *};
use crate::proto::*;
use super::*;
mod stream;
pub use stream::*;
/// An unimplemented transport for testing purposes only.
pub mod unimplemented;
/// The connection
pub trait Connection: Send + Sync {
/// The reader of the connection
type Reader: ProtoReader + Unpin;
/// The writer of the connection
type Writer: ProtoWriter + Unpin;
/// Splits the connection into a reader and a writer
fn split(self) -> (Self::Reader, Self::Writer);
/// Read the payload from the proto reader to the buffer
///
/// Returns the number of bytes read
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
/// Read exactly the payload from the proto reader to the buffer
fn read_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
/// Peek the payload from the proto reader to the buffer
///
/// Returns the number of bytes peeked
fn peek(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
/// Peek exactly the payload from the proto reader to the buffer
fn peek_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
/// Consume the content in peek buffer
fn consume_peek(&mut self);
/// Write the payload to the proto writer
fn write_all(&mut self, payload: &[u8]) -> impl Future<Output = std::io::Result<()>> + Send;
/// Flush the proto writer
fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
/// Close the proto writer
fn close(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
}
/// An error for the transport layer.
pub trait TransportError: From<std::io::Error> + std::error::Error + Send + Sync + 'static {
/// Returns `true` if the error is a remote failure.
///
/// e.g. Errors happened when:
/// 1. Fail to send to a remote node
/// 2. Fail to receive from a remote node.
/// 3. Fail to dial a remote node.
///
/// The above errors can be treated as remote failures.
fn is_remote_failure(&self) -> bool;
/// Custom the error.
fn custom(err: std::borrow::Cow<'static, str>) -> Self;
}
/// Transport is used to abstract over communicating with other peers. The packet
/// interface is assumed to be best-effort and the stream interface is assumed to
/// be reliable.
pub trait Transport: Sized + Send + Sync + 'static {
/// The error type for the transport
type Error: TransportError;
/// The id type used to identify nodes
type Id: Id + Data + Send + Sync + 'static;
/// The address type of the node
type Address: Address + Send + Sync + 'static;
/// The resolved address type of the node
type ResolvedAddress: Data
+ CheapClone
+ core::hash::Hash
+ Eq
+ Ord
+ core::fmt::Debug
+ core::fmt::Display
+ Send
+ Sync
+ 'static;
/// The address resolver used to resolve addresses
type Resolver: AddressResolver<
Runtime = Self::Runtime,
Address = Self::Address,
ResolvedAddress = Self::ResolvedAddress,
>;
/// The promised connection used to send and receive messages
type Connection: Connection + Send + Sync + 'static;
/// The async runtime
type Runtime: RuntimeLite;
/// The options used to construct the transport
type Options: Send + Sync + 'static;
/// Creates a new transport with the given options
fn new(options: Self::Options) -> impl Future<Output = Result<Self, Self::Error>> + Send;
/// Resolves the given address to a resolved address
fn resolve(
&self,
addr: &<Self::Resolver as AddressResolver>::Address,
) -> impl Future<Output = Result<Self::ResolvedAddress, Self::Error>> + Send;
/// Returns the local id of the node
fn local_id(&self) -> &Self::Id;
/// Returns the local address of the node
fn local_address(&self) -> &<Self::Resolver as AddressResolver>::Address;
/// Returns the advertise address of the node
fn advertise_address(&self) -> &Self::ResolvedAddress;
/// Returns the maximum size of a packet that can be sent
fn max_packet_size(&self) -> usize;
/// Returns the header overhead in bytes this transport.
fn header_overhead(&self) -> usize;
/// Returns an error if the given address is blocked
fn blocked_address(&self, addr: &Self::ResolvedAddress) -> Result<(), Self::Error>;
/// This is a hook that allows the transport to perform any extra work
/// when the memberlist accepts a new connection from a remote node.
///
/// This method will be invoked before the [`Memberlist`] starts to process the data owned
/// by the memberlist protocol.
///
/// Nomally, the implementation should do nothing, just return the given conn
/// back to the caller.
///
/// If the transport sends extra data (e.g. a header) in [`Transport::write`], then in this method,
/// the implementor should consume the extra data sent by the remote node.
///
/// e.g.
///
/// If in [`Transport::write`], the implementor prepends a header to the payload,
/// then in this method, the implementor should consume the header.
fn read(
&self,
from: &Self::ResolvedAddress,
conn: &mut <Self::Connection as Connection>::Reader,
) -> impl Future<Output = Result<usize, Self::Error>> + Send {
async move {
let _from = from;
let _conn = conn;
Ok(0)
}
}
/// Sends a message to the remote node by promised connection.
///
/// The `payload` has already been processed by [`ProtoEncoder`].
///
/// The implementor can send more data to the remote node,
/// but it must before sending the `payload`.
///
/// Returns the number of bytes sent.
fn write(
&self,
conn: &mut <Self::Connection as Connection>::Writer,
payload: Payload,
) -> impl Future<Output = Result<usize, Self::Error>> + Send {
async move {
let src = payload.as_slice();
let len = src.len();
conn.write_all(src).await?;
conn.flush().await?;
tracing::trace!(data=?src, "memberlist.transport: promised connection");
Ok(len)
}
}
/// A packet-oriented interface that fires off the given
/// payload to the given address in a connectionless fashion.
///
/// The payload is not compressed or encrypted, the transport
/// layer is expected to handle that by themselves.
///
/// # Returns
///
/// - number of bytes sent
/// - a time stamp that's as close as possible to when the packet
/// was transmitted to help make accurate RTT measurements during probes.
fn send_to(
&self,
addr: &Self::ResolvedAddress,
packet: Payload,
) -> impl Future<Output = Result<(usize, <Self::Runtime as RuntimeLite>::Instant), Self::Error>> + Send;
/// Used to create a bidirection connection that allows us to perform
/// two-way communication with a peer. This is generally more expensive
/// than packet connections so is used for more infrequent operations
/// such as anti-entropy or fallback probes if the packet-oriented probe
/// failed.
fn open(
&self,
addr: &Self::ResolvedAddress,
deadline: <Self::Runtime as RuntimeLite>::Instant,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
/// Returns a packet subscriber that can be used to receive incoming packets
fn packet(
&self,
) -> PacketSubscriber<Self::ResolvedAddress, <Self::Runtime as RuntimeLite>::Instant>;
/// Returns a receiver that can be read to handle incoming stream
/// connections from other peers. How this is set up for listening is
/// left as an exercise for the concrete transport implementations.
fn stream(&self) -> StreamSubscriber<Self::ResolvedAddress, Self::Connection>;
/// Returns `true` if the transport provides provides reliable packets delivery.
///
/// When `true`, the [`Memberlist`] will not include checksums in packets
/// even if a [`ChecksumAlgorithm`] is configured in [`Options`],
/// since the transport already guarantees data integrity.
///
/// # Examples
///
/// - Reliable: TCP, QUIC
/// - Unreliable: UDP
fn packet_reliable(&self) -> bool;
/// Returns `true` if the transport provides packets security.
///
/// When `true`, the [`Memberlist`] will not perform additional payload
/// encryption even if a [`EncryptionAlgorithm`] is configured in [`Options`],
/// since the transport already provides packet security.
///
/// # Examples
///
/// - Secure: QUIC, TLS
/// - Insecure: TCP, UDP
fn packet_secure(&self) -> bool;
/// Returns `true` if the transport provides stream security.
///
/// When `true`, the [`Memberlist`] will not perform additional payload
/// encryption even if a [`EncryptionAlgorithm`] is configured in [`Options`],
/// since the transport already provides stream security.
///
/// # Examples
///
/// - Secure: QUIC, TLS
/// - Insecure: TCP
fn stream_secure(&self) -> bool;
/// Shutdown the transport
fn shutdown(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
}