tokio_etcd_grpc_client/
lib.rs1mod auth_service;
2mod pb;
3
4use core::fmt;
5use std::{
6 fmt::{Display, Formatter},
7 sync::Arc,
8 time::Duration,
9};
10
11use auth_service::AuthService;
12
13use http::{uri::InvalidUri, HeaderValue};
14use tonic::transport::{Channel, Endpoint};
15use tower::discover::Change;
16
17pub use auth_service::AuthServiceTokenSetter;
18pub use pb::{
19 etcdserverpb::{
20 auth_client::AuthClient, cluster_client::ClusterClient, kv_client::KvClient,
21 lease_client::LeaseClient, maintenance_client::MaintenanceClient,
22 watch_client::WatchClient, *,
23 },
24 mvccpb::{event::EventType, Event, KeyValue},
25};
26
27pub type AuthedChannel = AuthService<Channel>;
28
29#[derive(Debug, Default)]
30pub enum EndpointSchema {
31 #[default]
32 Http,
33 }
35impl EndpointSchema {
36 fn default_port(&self) -> u16 {
37 match self {
38 EndpointSchema::Http => 2379,
39 }
41 }
42}
43
44impl Display for EndpointSchema {
45 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
46 match self {
47 EndpointSchema::Http => write!(f, "http"),
48 }
50 }
51}
52
53#[derive(Debug, Default)]
54pub enum EndpointPort {
55 #[default]
56 DefaultForSchema,
57 Custom(u16),
58}
59
60#[derive(Default, Debug)]
61pub struct ClientEndpointConfig {
62 schema: EndpointSchema,
63 port: EndpointPort,
64 token: Option<HeaderValue>,
65 request_timeout: Duration,
66 connect_timeout: Duration,
67 tcp_keep_alive: Option<Duration>,
68 http2_keep_alive_interval: Duration,
69 http2_keep_alive_timeout: Duration,
70 http2_keep_alive_while_idle: bool,
71}
72
73impl ClientEndpointConfig {
74 const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
75 const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
76 const DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(30);
77 const DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(15);
78
79 pub fn http() -> Self {
80 Self {
81 schema: EndpointSchema::Http,
82 connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT,
83 request_timeout: Self::DEFAULT_REQUEST_TIMEOUT,
84 http2_keep_alive_interval: Self::DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL,
85 http2_keep_alive_timeout: Self::DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT,
86 ..Default::default()
87 }
88 }
89
90 pub fn auth_token(mut self, token: HeaderValue) -> Self {
91 self.token = Some(token);
92 self
93 }
94
95 pub fn request_timeout(mut self, timeout: Duration) -> Self {
97 self.request_timeout = timeout;
98 self
99 }
100
101 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
103 self.connect_timeout = timeout;
104 self
105 }
106
107 pub fn tcp_keep_alive(mut self, timeout: Duration) -> Self {
109 self.tcp_keep_alive = Some(timeout);
110 self
111 }
112
113 pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
115 self.http2_keep_alive_interval = interval;
116 self
117 }
118
119 pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
121 self.http2_keep_alive_timeout = timeout;
122 self
123 }
124
125 fn configure(&self, endpoint: Endpoint) -> Endpoint {
126 endpoint
127 .timeout(self.request_timeout)
128 .connect_timeout(self.connect_timeout)
129 .tcp_keepalive(self.tcp_keep_alive)
130 .http2_keep_alive_interval(self.http2_keep_alive_interval)
131 .keep_alive_while_idle(self.http2_keep_alive_while_idle)
132 .keep_alive_timeout(self.http2_keep_alive_timeout)
133 .user_agent(concat!(
134 env!("CARGO_PKG_NAME"),
135 "/",
136 env!("CARGO_PKG_VERSION")
137 ))
138 .expect("invariant: user-agent should always be valid")
139 }
140
141 fn port(&self) -> u16 {
142 match self.port {
143 EndpointPort::DefaultForSchema => self.schema.default_port(),
144 EndpointPort::Custom(port) => port,
145 }
146 }
147}
148
149struct InnerClients {
151 kv: KvClient<AuthedChannel>,
153 watch: WatchClient<AuthedChannel>,
154 lease: LeaseClient<AuthedChannel>,
155 cluster: ClusterClient<AuthedChannel>,
156 maintenance: MaintenanceClient<AuthedChannel>,
157 auth: AuthClient<AuthedChannel>,
158
159 token_setter: AuthServiceTokenSetter,
161}
162
163#[derive(Clone)]
164pub struct EtcdGrpcClient {
165 inner: Arc<InnerClients>,
168}
169
170impl InnerClients {
171 fn new(channel: AuthedChannel, token_setter: AuthServiceTokenSetter) -> Self {
172 Self {
173 kv: KvClient::new(channel.clone()),
174 watch: WatchClient::new(channel.clone()),
175 lease: LeaseClient::new(channel.clone()),
176 cluster: ClusterClient::new(channel.clone()),
177 maintenance: MaintenanceClient::new(channel.clone()),
178 auth: AuthClient::new(channel),
179 token_setter,
180 }
181 }
182}
183
184impl EtcdGrpcClient {
185 pub fn set_auth_token(&self, token: http::HeaderValue) {
186 self.inner.token_setter.set_token(token);
187 }
188
189 pub fn clear_auth_token(&self) {
190 self.inner.token_setter.clear_token();
191 }
192
193 pub fn kv(&self) -> KvClient<AuthedChannel> {
194 self.inner.kv.clone()
195 }
196
197 pub fn watch(&self) -> WatchClient<AuthedChannel> {
198 self.inner.watch.clone()
199 }
200
201 pub fn lease(&self) -> LeaseClient<AuthedChannel> {
202 self.inner.lease.clone()
203 }
204
205 pub fn cluster(&self) -> ClusterClient<AuthedChannel> {
206 self.inner.cluster.clone()
207 }
208
209 pub fn maintenance(&self) -> MaintenanceClient<AuthedChannel> {
210 self.inner.maintenance.clone()
211 }
212
213 pub fn auth(&self) -> AuthClient<AuthedChannel> {
214 self.inner.auth.clone()
215 }
216}
217
218pub fn client(
220 hostnames: impl IntoIterator<Item = impl AsRef<str>> + ExactSizeIterator,
221 endpoint_config: ClientEndpointConfig,
222) -> Result<EtcdGrpcClient, InvalidUri> {
223 let (channel, tx) = Channel::balance_channel(hostnames.len());
224 for hostname in hostnames.into_iter() {
225 let endpoint = endpoint_for_hostname(hostname.as_ref(), &endpoint_config)?;
226 tx.try_send(Change::Insert(endpoint.uri().clone(), endpoint))
227 .expect("invariant: sending on channel cannot fail, as capacity is same as number of endpoints");
228 }
229
230 let (channel, token_setter) = AuthService::pair(channel, endpoint_config.token);
231 Ok(EtcdGrpcClient {
232 inner: Arc::new(InnerClients::new(channel, token_setter)),
233 })
234}
235
236fn endpoint_for_hostname(
237 hostname: &str,
238 cfg: &ClientEndpointConfig,
239) -> Result<Endpoint, InvalidUri> {
240 let url = format!("{}://{hostname}:{}", cfg.schema, cfg.port());
241 Ok(cfg.configure(Channel::builder(url.parse()?)))
242}