1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
//! Always-ready HTTP client channels for gRPC or other RPC-like requests.
//!
//! In a microservices environment, components usually serve RPCs from their
//! clients in part by making further requests to backends of their own. For
//! example an application frontend serves a request by making a query to a
//! storage backend and a notification queue before applying business logic
//! and constructing a response to send to its own client, the end user.
//!
//! When RPC frontends and backends are both replicated as multiple tasks,
//! discipline in managing and load balancing the flow of requests is
//! important. This crate aims to offer the client side of that function.
//!
//! The main focus is on offering an always-ready gRPC client channel type
//! (in the [`grpc`] module) which load-balances over multiple individual
//! actively health-checked connections. A generic HTTP client channel typr
//! (in the [`http`] module) is also provided but it is currently less
//! polished.
//!
//! The building blocks revolve around [`tower`] (for the service stack) and
//! [`hyper`] (for HTTP), plus [`tokio`] and [`rustls`]. In particular, the
//! basic load balancing uses [`tower::balance::p2c`]. Finally, the gRPC
//! layer on top is designed to be used with [`tonic`].
//!
//! Some of the features brought by this crate are:
//!
//! - As soon as they are created, and continuing in the background for their
//! lifetime, channels begin attempting to constantly maintain a
//! sufficiently-sized pool of healthy member connections to backends.
//! - If multiple backend addresses are available, the channel will attempt
//! to use all of them, using different addresses for different member
//! connections, mitigating the effect of single backend tasks going away.
//! - Name resolution follows DNS TTLs, so that if the backend is using
//! DNS-based load balancing the channel notices and reacts when its
//! assignment changes.
//! - Member connections are individually health-checked and evicted from
//! the channel when they fail. Note that this is different to plain
//! [`tower::balance::p2c::Balance`], which only polls (and potentially
//! evicts) members on use.
//! - Channels that become critically unhealthy (too few healthy members are
//! healthy) are handled in a degraded mode: we temporarily make connected
//! but unhealthy members available to accept requests and stop evicting them.
//!
//! This crate can be used by itself but is designed to be used with
//! [`comprehensive`] which will further add the following features:
//!
//! - Easy macro-based declaration of a gRPC client made available as a
//! resource to the rest of the assembly.
//! - Automatically connected to the assembly-wide health status signal so
//! that when a client channel to a required backend is unhealthy then the
//! colocated server resources also report unhealthy.
//! - TLS configuration supplied from the assembly-wide TLS resource
//! (reflecting the use of a process-wide cryptographic identiry and
//! dynamically reloaded when changed (such as on certificate rotation).
//! - Configuration of the channel's backend URI and other configurable
//! properties using a standard set of flags.
//!
//! # gRPC client example using warm_channels directly
//!
//! ```
//! use std::sync::Arc;
//! use hickory_resolver::TokioResolver;
//!
//! # tokio_test::block_on(async {
//! let r = Arc::new(TokioResolver::builder_tokio().unwrap().build());
//! let uri = "https://example.org".try_into().unwrap();
//! let stream = warm_channels::resolve_uri(&uri, r).unwrap();
//! # #[cfg(feature = "grpc")]
//! let (stack, worker) = warm_channels::grpc_channel(
//! uri.clone(),
//! warm_channels::grpc::GRPCChannelConfig::default(),
//! "demo",
//! warm_channels::stream::TCPConnector::default(),
//! stream,
//! |h| println!("healthy: {}", h),
//! );
//! # #[cfg(feature = "grpc")]
//! tokio::task::spawn(worker);
//! # // Fake for demo
//! # mod pb { pub mod test_client {
//! # pub struct TestClient;
//! # impl TestClient {
//! # pub fn with_origin<T, U>(t: T, u: U) -> Self { Self }
//! # pub async fn greet<T>(self, t: T) {}
//! # }
//! # } }
//! # #[cfg(feature = "grpc")]
//! let client = pb::test_client::TestClient::with_origin(stack, uri);
//!
//! # #[cfg(feature = "grpc")]
//! println!("{:?}", client.greet(tonic::Request::new(())).await);
//! # });
//! ```
//!
//! # gRPC client example using [`comprehensive`]
//!
//! ```
//! use comprehensive_grpc::GrpcClient;
//! # // Fake for demo
//! # mod pb { pub mod test_client {
//! # #[derive(Clone)] pub struct TestClient<T: Clone>(T);
//! # impl<T: Clone> TestClient<T> {
//! # pub fn with_origin<U>(t: T, u: U) -> Self { Self(t) }
//! # }
//! # } }
//!
//! #[derive(GrpcClient)]
//! struct Client(
//! pb::test_client::TestClient<comprehensive_grpc::client::Channel>,
//! comprehensive_grpc::client::ClientWorker,
//! );
//! ```
//!
//! `Client` may then be included as a dependency in a Comprehensive Assembly.
//! See the full [gRPC hello world client example].
//!
//! # Possible future work:
//!
//! - Dynamically sized member set, probably based on reacting to request
//! processing latency.
//!
//! # Features
//!
//! - **grpc**: Enable gRPC functionality
//! - **tls**: Enable crypto functionality
//! - **metrics**: Export metrics about channel health and gRPC requests
//! - **unix**; Enable UNIX domain sockets connector.
//!
//! All are enabled by default.
//!
//! [`comprehensive`]: https://docs.rs/comprehensive/latest/comprehensive/
//! [`rustls`]: https://docs.rs/rustls/latest/rustls/
//! [`tonic`]: https://docs.rs/tonic/latest/tonic/
//! [gRPC hello world client example]: https://github.com/vandry/comprehensive/blob/master/examples/src/helloworld-grpc-client.rs
// Would impose a requirement for rustc 1.88
// https://github.com/rust-lang/rust/pull/132833
extern crate assert_matches;
use format_rfc3339;
use Future;
use Arc;
use SystemTime;
use ;
use Service;
/// A constructor for the streaming byte IO channel over which load-balanced
/// member channels will be built.
///
/// This takes an generic network addresss type `A` (usually a
/// [`std::net::SocketAddr`]) and delivers a raw byte IO channnel connected
/// to a server at that address.
///
/// Users should usually use one of these implementations of the trait:
///
/// ```
/// // For plaintext only, can connect to TCP/IP addresses or UNIX sockets:
/// warm_channels::stream::StreamConnector::default();
/// // For plaintext or TLS, depending on the URI:
/// # #[cfg(feature = "tls")]
/// # let uri = "https://host".try_into().unwrap();
/// # use std::sync::Arc;
/// # #[cfg(feature = "tls")]
/// # let tls_config = tokio_rustls::rustls::client::ClientConfig::builder()
/// # .with_root_certificates(tokio_rustls::rustls::RootCertStore::empty())
/// # .with_no_client_auth();
/// # #[cfg(feature = "tls")]
/// warm_channels::tls::TLSConnector::new(
/// warm_channels::stream::StreamConnector::default(),
/// &uri, Some(&tls_config));
/// ```
/// A health checking implementation for individual channels that are part of
/// a load-balanced set.