tokio_etcd_grpc_client/
lib.rs

1mod auth_service;
2mod pb;
3
4use core::fmt;
5use std::{
6    fmt::{Display, Formatter},
7    sync::Arc,
8    time::Duration,
9};
10
11use auth_service::AuthService;
12
13use http::{uri::InvalidUri, HeaderValue};
14use tonic::transport::{Channel, Endpoint};
15use tower::discover::Change;
16
17pub use auth_service::AuthServiceTokenSetter;
18pub use pb::{
19    etcdserverpb::{
20        auth_client::AuthClient, cluster_client::ClusterClient, kv_client::KvClient,
21        lease_client::LeaseClient, maintenance_client::MaintenanceClient,
22        watch_client::WatchClient, *,
23    },
24    mvccpb::{event::EventType, Event, KeyValue},
25};
26
27pub type AuthedChannel = AuthService<Channel>;
28
29#[derive(Debug, Default)]
30pub enum EndpointSchema {
31    #[default]
32    Http,
33    // Https,
34}
35impl EndpointSchema {
36    fn default_port(&self) -> u16 {
37        match self {
38            EndpointSchema::Http => 2379,
39            // EndpointSchema::Https => ????,
40        }
41    }
42}
43
44impl Display for EndpointSchema {
45    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
46        match self {
47            EndpointSchema::Http => write!(f, "http"),
48            // EndpointSchema::Https => write!(f, "https"),
49        }
50    }
51}
52
53#[derive(Debug, Default)]
54pub enum EndpointPort {
55    #[default]
56    DefaultForSchema,
57    Custom(u16),
58}
59
60#[derive(Default, Debug)]
61pub struct ClientEndpointConfig {
62    schema: EndpointSchema,
63    port: EndpointPort,
64    token: Option<HeaderValue>,
65    request_timeout: Duration,
66    connect_timeout: Duration,
67    tcp_keep_alive: Option<Duration>,
68    http2_keep_alive_interval: Duration,
69    http2_keep_alive_timeout: Duration,
70    http2_keep_alive_while_idle: bool,
71}
72
73impl ClientEndpointConfig {
74    const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
75    const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
76    const DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(30);
77    const DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(15);
78
79    pub fn http() -> Self {
80        Self {
81            schema: EndpointSchema::Http,
82            connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT,
83            request_timeout: Self::DEFAULT_REQUEST_TIMEOUT,
84            http2_keep_alive_interval: Self::DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL,
85            http2_keep_alive_timeout: Self::DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT,
86            ..Default::default()
87        }
88    }
89
90    pub fn auth_token(mut self, token: HeaderValue) -> Self {
91        self.token = Some(token);
92        self
93    }
94
95    /// Default: 5s.
96    pub fn request_timeout(mut self, timeout: Duration) -> Self {
97        self.request_timeout = timeout;
98        self
99    }
100
101    /// Default: 3s.
102    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
103        self.connect_timeout = timeout;
104        self
105    }
106
107    /// Default: None.
108    pub fn tcp_keep_alive(mut self, timeout: Duration) -> Self {
109        self.tcp_keep_alive = Some(timeout);
110        self
111    }
112
113    /// Default: 30s
114    pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
115        self.http2_keep_alive_interval = interval;
116        self
117    }
118
119    /// Defualt: 15s
120    pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
121        self.http2_keep_alive_timeout = timeout;
122        self
123    }
124
125    fn configure(&self, endpoint: Endpoint) -> Endpoint {
126        endpoint
127            .timeout(self.request_timeout)
128            .connect_timeout(self.connect_timeout)
129            .tcp_keepalive(self.tcp_keep_alive)
130            .http2_keep_alive_interval(self.http2_keep_alive_interval)
131            .keep_alive_while_idle(self.http2_keep_alive_while_idle)
132            .keep_alive_timeout(self.http2_keep_alive_timeout)
133            .user_agent(concat!(
134                env!("CARGO_PKG_NAME"),
135                "/",
136                env!("CARGO_PKG_VERSION")
137            ))
138            .expect("invariant: user-agent should always be valid")
139    }
140
141    fn port(&self) -> u16 {
142        match self.port {
143            EndpointPort::DefaultForSchema => self.schema.default_port(),
144            EndpointPort::Custom(port) => port,
145        }
146    }
147}
148
149// todo: better name for this?
150struct InnerClients {
151    // This order is as defined in the proto, so we're keeping it.
152    kv: KvClient<AuthedChannel>,
153    watch: WatchClient<AuthedChannel>,
154    lease: LeaseClient<AuthedChannel>,
155    cluster: ClusterClient<AuthedChannel>,
156    maintenance: MaintenanceClient<AuthedChannel>,
157    auth: AuthClient<AuthedChannel>,
158
159    // extras:
160    token_setter: AuthServiceTokenSetter,
161}
162
163#[derive(Clone)]
164pub struct EtcdGrpcClient {
165    // Client tries to be extremely cheap to clone by only having 1 arc inside of it, rather than,
166    // having the arc of each kvclient, watchclient, etc.
167    inner: Arc<InnerClients>,
168}
169
170impl InnerClients {
171    fn new(channel: AuthedChannel, token_setter: AuthServiceTokenSetter) -> Self {
172        Self {
173            kv: KvClient::new(channel.clone()),
174            watch: WatchClient::new(channel.clone()),
175            lease: LeaseClient::new(channel.clone()),
176            cluster: ClusterClient::new(channel.clone()),
177            maintenance: MaintenanceClient::new(channel.clone()),
178            auth: AuthClient::new(channel),
179            token_setter,
180        }
181    }
182}
183
184impl EtcdGrpcClient {
185    pub fn set_auth_token(&self, token: http::HeaderValue) {
186        self.inner.token_setter.set_token(token);
187    }
188
189    pub fn clear_auth_token(&self) {
190        self.inner.token_setter.clear_token();
191    }
192
193    pub fn kv(&self) -> KvClient<AuthedChannel> {
194        self.inner.kv.clone()
195    }
196
197    pub fn watch(&self) -> WatchClient<AuthedChannel> {
198        self.inner.watch.clone()
199    }
200
201    pub fn lease(&self) -> LeaseClient<AuthedChannel> {
202        self.inner.lease.clone()
203    }
204
205    pub fn cluster(&self) -> ClusterClient<AuthedChannel> {
206        self.inner.cluster.clone()
207    }
208
209    pub fn maintenance(&self) -> MaintenanceClient<AuthedChannel> {
210        self.inner.maintenance.clone()
211    }
212
213    pub fn auth(&self) -> AuthClient<AuthedChannel> {
214        self.inner.auth.clone()
215    }
216}
217
218/// Create a gRPC client [`Channel`] from a list of etcd endpoints.
219pub fn client(
220    hostnames: impl IntoIterator<Item = impl AsRef<str>> + ExactSizeIterator,
221    endpoint_config: ClientEndpointConfig,
222) -> Result<EtcdGrpcClient, InvalidUri> {
223    let (channel, tx) = Channel::balance_channel(hostnames.len());
224    for hostname in hostnames.into_iter() {
225        let endpoint = endpoint_for_hostname(hostname.as_ref(), &endpoint_config)?;
226        tx.try_send(Change::Insert(endpoint.uri().clone(), endpoint))
227            .expect("invariant: sending on channel cannot fail, as capacity is same as number of endpoints");
228    }
229
230    let (channel, token_setter) = AuthService::pair(channel, endpoint_config.token);
231    Ok(EtcdGrpcClient {
232        inner: Arc::new(InnerClients::new(channel, token_setter)),
233    })
234}
235
236fn endpoint_for_hostname(
237    hostname: &str,
238    cfg: &ClientEndpointConfig,
239) -> Result<Endpoint, InvalidUri> {
240    let url = format!("{}://{hostname}:{}", cfg.schema, cfg.port());
241    Ok(cfg.configure(Channel::builder(url.parse()?)))
242}