1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::sync::mpsc::channel;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::{
7 codegen::InterceptedService,
8 metadata::{Ascii, MetadataValue},
9 service::Interceptor,
10 transport::Channel,
11 Request, Status,
12};
13
14use crate::auth::{AuthOp, AuthenticateRequest, AuthenticateResponse};
15use crate::cluster::{
16 ClusterOp, MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
17 MemberRemoveRequest, MemberRemoveResponse, MemberUpdateRequest, MemberUpdateResponse,
18};
19use crate::kv::{
20 CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, KeyRange, KeyValueOp,
21 PutRequest, PutResponse, RangeRequest, RangeResponse, TxnRequest, TxnResponse,
22};
23use crate::lease::{
24 LeaseGrantRequest, LeaseGrantResponse, LeaseId, LeaseKeepAlive, LeaseOp, LeaseRevokeRequest,
25 LeaseRevokeResponse, LeaseTimeToLiveRequest, LeaseTimeToLiveResponse,
26};
27use crate::proto::etcdserverpb;
28use crate::proto::etcdserverpb::cluster_client::ClusterClient;
29use crate::proto::etcdserverpb::maintenance_client::MaintenanceClient;
30use crate::proto::etcdserverpb::{
31 auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient,
32 watch_client::WatchClient,
33};
34use crate::watch::{WatchCanceler, WatchCreateRequest, WatchOp, WatchStream};
35use crate::{Error, Result};
36
37#[derive(Clone)]
38pub struct TokenInterceptor {
39 token: Option<MetadataValue<Ascii>>,
40}
41
42impl TokenInterceptor {
43 fn new(token: Option<String>) -> Self {
44 Self {
45 token: token.map(|token: String| MetadataValue::from_str(&token).unwrap()),
46 }
47 }
48}
49
50impl Interceptor for TokenInterceptor {
51 fn call(&mut self, mut req: tonic::Request<()>) -> std::result::Result<Request<()>, Status> {
52 match &self.token {
53 Some(token) => {
54 req.metadata_mut().insert("authorization", token.clone());
55 Ok(req)
56 }
57 None => Ok(req),
58 }
59 }
60}
61
62#[cfg(feature = "tls")]
63#[derive(Debug, Clone)]
64enum TlsOption {
65 None,
66 WithConfig(tonic::transport::ClientTlsConfig),
67}
68
69#[cfg(not(feature = "tls"))]
70#[derive(Debug, Clone)]
71enum TlsOption {
72 None,
73}
74
75#[derive(Debug, Clone)]
76pub struct Endpoint {
77 url: String,
78
79 tls_opt: TlsOption,
80}
81
82impl Endpoint {
83 pub fn new(url: impl Into<String>) -> Self {
84 Self {
85 url: url.into(),
86 tls_opt: TlsOption::None,
87 }
88 }
89
90 #[cfg(feature = "tls")]
91 pub fn tls_raw(
92 mut self,
93 domain_name: impl Into<String>,
94 ca_cert: impl AsRef<[u8]>,
95 client_cert: impl AsRef<[u8]>,
96 client_key: impl AsRef<[u8]>,
97 ) -> Self {
98 use tonic::transport::{Certificate, ClientTlsConfig, Identity};
99
100 let certificate = Certificate::from_pem(ca_cert);
101 let identity = Identity::from_pem(client_cert, client_key);
102
103 self.tls_opt = TlsOption::WithConfig(
104 ClientTlsConfig::new()
105 .domain_name(domain_name)
106 .ca_certificate(certificate)
107 .identity(identity),
108 );
109
110 self
111 }
112
113 #[cfg(feature = "tls")]
114 pub async fn tls(
115 self,
116 domain_name: impl Into<String>,
117 ca_cert_path: impl AsRef<std::path::Path>,
118 client_cert_path: impl AsRef<std::path::Path>,
119 client_key_path: impl AsRef<std::path::Path>,
120 ) -> Result<Self> {
121 use tokio::fs::read;
122
123 let ca_cert = read(ca_cert_path).await?;
124
125 let client_cert = read(client_cert_path).await?;
126 let client_key = read(client_key_path).await?;
127
128 Ok(self.tls_raw(domain_name, ca_cert, client_cert, client_key))
129 }
130}
131
132impl<T> From<T> for Endpoint
133where
134 T: Into<String>,
135{
136 fn from(url: T) -> Self {
137 Self {
138 url: url.into(),
139 tls_opt: TlsOption::None,
140 }
141 }
142}
143
144#[derive(Clone, Debug)]
145pub struct ClientConfig {
146 pub endpoints: Vec<Endpoint>,
147 pub auth: Option<(String, String)>,
148 pub connect_timeout: Duration,
149 pub http2_keep_alive_interval: Duration,
150}
151
152impl ClientConfig {
153 pub fn new(endpoints: impl Into<Vec<Endpoint>>) -> Self {
154 Self {
155 endpoints: endpoints.into(),
156 auth: None,
157 connect_timeout: Duration::from_secs(30),
158 http2_keep_alive_interval: Duration::from_secs(5),
159 }
160 }
161
162 pub fn user_auth(&mut self, name: impl Into<String>, password: impl Into<String>) {
163 self.auth = Some((name.into(), password.into()));
164 }
165
166 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
167 self.connect_timeout = timeout;
168 self
169 }
170
171 pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
172 self.http2_keep_alive_interval = interval;
173 self
174 }
175}
176
177#[derive(Clone)]
178pub struct Client {
179 pub auth_client: AuthClient<InterceptedService<Channel, TokenInterceptor>>,
180 pub kv_client: KvClient<InterceptedService<Channel, TokenInterceptor>>,
181 pub watch_client: WatchClient<InterceptedService<Channel, TokenInterceptor>>,
182 pub cluster_client: ClusterClient<InterceptedService<Channel, TokenInterceptor>>,
183 pub maintenance_client: MaintenanceClient<InterceptedService<Channel, TokenInterceptor>>,
184 pub lease_client: LeaseClient<InterceptedService<Channel, TokenInterceptor>>,
185}
186
187impl Client {
188 pub async fn connect_with_token(cfg: &ClientConfig, token: Option<String>) -> Result<Self> {
189 let auth_interceptor = TokenInterceptor::new(token);
190
191 let channel = {
192 let mut endpoints = Vec::with_capacity(cfg.endpoints.len());
193 for e in cfg.endpoints.iter() {
194 let mut c = Channel::from_shared(e.url.clone())?
195 .connect_timeout(cfg.connect_timeout)
196 .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
197
198 #[cfg(feature = "tls")]
199 {
200 if let TlsOption::WithConfig(tls) = e.tls_opt.clone() {
201 c = c.tls_config(tls)?;
202 }
203 }
204
205 endpoints.push(c);
206 }
207
208 Channel::balance_list(endpoints.into_iter())
209 };
210
211 let auth_client = AuthClient::with_interceptor(channel.clone(), auth_interceptor.clone());
212 let kv_client = KvClient::with_interceptor(channel.clone(), auth_interceptor.clone());
213 let watch_client = WatchClient::with_interceptor(channel.clone(), auth_interceptor.clone());
214 let cluster_client =
215 ClusterClient::with_interceptor(channel.clone(), auth_interceptor.clone());
216 let maintenance_client =
217 MaintenanceClient::with_interceptor(channel.clone(), auth_interceptor.clone());
218 let lease_client = LeaseClient::with_interceptor(channel, auth_interceptor);
219
220 Ok(Self {
221 auth_client,
222 kv_client,
223 watch_client,
224 cluster_client,
225 maintenance_client,
226 lease_client,
227 })
228 }
229
230 pub async fn connect(mut cfg: ClientConfig) -> Result<Self> {
231 let cli = Self::connect_with_token(&cfg, None).await?;
232
233 match cfg.auth.take() {
234 Some((name, password)) => {
235 let token = cli.authenticate((name, password)).await?.token;
236
237 Self::connect_with_token(&cfg, Some(token)).await
238 }
239 None => Ok(cli),
240 }
241 }
242}
243
244#[async_trait]
245impl AuthOp for Client {
246 async fn authenticate<R>(&self, req: R) -> Result<AuthenticateResponse>
247 where
248 R: Into<AuthenticateRequest> + Send,
249 {
250 let req = tonic::Request::new(req.into().into());
251 let resp = self.auth_client.clone().authenticate(req).await?;
252
253 Ok(resp.into_inner().into())
254 }
255}
256
257#[async_trait]
258impl KeyValueOp for Client {
259 async fn put<R>(&self, req: R) -> Result<PutResponse>
260 where
261 R: Into<PutRequest> + Send,
262 {
263 let req = tonic::Request::new(req.into().into());
264 let resp = self.kv_client.clone().put(req).await?;
265
266 Ok(resp.into_inner().into())
267 }
268
269 async fn get<R>(&self, req: R) -> Result<RangeResponse>
270 where
271 R: Into<RangeRequest> + Send,
272 {
273 let req = tonic::Request::new(req.into().into());
274 let resp = self.kv_client.clone().range(req).await?;
275
276 Ok(resp.into_inner().into())
277 }
278
279 async fn get_all(&self) -> Result<RangeResponse> {
280 self.get(KeyRange::all()).await
281 }
282
283 async fn get_by_prefix<K>(&self, p: K) -> Result<RangeResponse>
284 where
285 K: Into<Vec<u8>> + Send,
286 {
287 self.get(KeyRange::prefix(p)).await
288 }
289
290 async fn get_range<F, E>(&self, from: F, end: E) -> Result<RangeResponse>
291 where
292 F: Into<Vec<u8>> + Send,
293 E: Into<Vec<u8>> + Send,
294 {
295 self.get(KeyRange::range(from, end)).await
296 }
297
298 async fn delete<R>(&self, req: R) -> Result<DeleteResponse>
299 where
300 R: Into<DeleteRequest> + Send,
301 {
302 let req = tonic::Request::new(req.into().into());
303 let resp = self.kv_client.clone().delete_range(req).await?;
304
305 Ok(resp.into_inner().into())
306 }
307
308 async fn delete_all(&self) -> Result<DeleteResponse> {
309 self.delete(KeyRange::all()).await
310 }
311
312 async fn delete_by_prefix<K>(&self, p: K) -> Result<DeleteResponse>
313 where
314 K: Into<Vec<u8>> + Send,
315 {
316 self.delete(KeyRange::prefix(p)).await
317 }
318
319 async fn delete_range<F, E>(&self, from: F, end: E) -> Result<DeleteResponse>
320 where
321 F: Into<Vec<u8>> + Send,
322 E: Into<Vec<u8>> + Send,
323 {
324 self.delete(KeyRange::range(from, end)).await
325 }
326
327 async fn txn<R>(&self, req: R) -> Result<TxnResponse>
328 where
329 R: Into<TxnRequest> + Send,
330 {
331 let req = tonic::Request::new(req.into().into());
332 let resp = self.kv_client.clone().txn(req).await?;
333
334 Ok(resp.into_inner().into())
335 }
336
337 async fn compact<R>(&self, req: R) -> Result<CompactResponse>
338 where
339 R: Into<CompactRequest> + Send,
340 {
341 let req = tonic::Request::new(req.into().into());
342 let resp = self.kv_client.clone().compact(req).await?;
343
344 Ok(resp.into_inner().into())
345 }
346}
347
348#[async_trait]
349impl WatchOp for Client {
350 async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
351 where
352 R: Into<WatchCreateRequest> + Send,
353 {
354 let (tx, rx) = channel::<etcdserverpb::WatchRequest>(128);
355
356 tx.send(req.into().into()).await?;
357
358 let resp = self
359 .watch_client
360 .clone()
361 .watch(ReceiverStream::new(rx))
362 .await?;
363
364 let mut inbound = resp.into_inner();
365
366 let watch_id = match inbound.message().await? {
367 Some(resp) => {
368 if !resp.created {
369 return Err(Error::WatchEvent(
370 "should receive created event at first".to_owned(),
371 ));
372 }
373 assert!(resp.events.is_empty(), "received created event {:?}", resp);
374 resp.watch_id
375 }
376
377 None => return Err(Error::CreateWatch),
378 };
379
380 Ok((WatchStream::new(inbound), WatchCanceler::new(watch_id, tx)))
381 }
382}
383
384#[async_trait]
385impl LeaseOp for Client {
386 async fn grant_lease<R>(&self, req: R) -> Result<LeaseGrantResponse>
387 where
388 R: Into<LeaseGrantRequest> + Send,
389 {
390 let req = tonic::Request::new(req.into().into());
391 let resp = self.lease_client.clone().lease_grant(req).await?;
392 Ok(resp.into_inner().into())
393 }
394
395 async fn revoke<R>(&self, req: R) -> Result<LeaseRevokeResponse>
396 where
397 R: Into<LeaseRevokeRequest> + Send,
398 {
399 let req = tonic::Request::new(req.into().into());
400 let resp = self.lease_client.clone().lease_revoke(req).await?;
401 Ok(resp.into_inner().into())
402 }
403
404 async fn keep_alive_for(&self, lease_id: LeaseId) -> Result<LeaseKeepAlive> {
405 let (req_tx, req_rx) = channel(1024);
406
407 let req_rx = ReceiverStream::new(req_rx);
408
409 let resp_rx = self
410 .lease_client
411 .clone()
412 .lease_keep_alive(req_rx)
413 .await?
414 .into_inner();
415
416 Ok(LeaseKeepAlive::new(lease_id, req_tx, resp_rx))
417 }
418
419 async fn time_to_live<R>(&self, req: R) -> Result<LeaseTimeToLiveResponse>
420 where
421 R: Into<LeaseTimeToLiveRequest> + Send,
422 {
423 let req = tonic::Request::new(req.into().into());
424 let resp = self.lease_client.clone().lease_time_to_live(req).await?;
425 Ok(resp.into_inner().into())
426 }
427}
428
429#[async_trait]
430impl ClusterOp for Client {
431 async fn member_add<R>(&self, req: R) -> Result<MemberAddResponse>
432 where
433 R: Into<MemberAddRequest> + Send,
434 {
435 let req = tonic::Request::new(req.into().into());
436 let resp = self.cluster_client.clone().member_add(req).await?;
437
438 Ok(resp.into_inner().into())
439 }
440
441 async fn member_remove<R>(&self, req: R) -> Result<MemberRemoveResponse>
442 where
443 R: Into<MemberRemoveRequest> + Send,
444 {
445 let req = tonic::Request::new(req.into().into());
446 let resp = self.cluster_client.clone().member_remove(req).await?;
447
448 Ok(resp.into_inner().into())
449 }
450
451 async fn member_update<R>(&self, req: R) -> Result<MemberUpdateResponse>
452 where
453 R: Into<MemberUpdateRequest> + Send,
454 {
455 let req = tonic::Request::new(req.into().into());
456 let resp = self.cluster_client.clone().member_update(req).await?;
457
458 Ok(resp.into_inner().into())
459 }
460
461 async fn member_list(&self) -> Result<MemberListResponse> {
462 let req = tonic::Request::new(MemberListRequest::new().into());
463 let resp = self.cluster_client.clone().member_list(req).await?;
464
465 Ok(resp.into_inner().into())
466 }
467}