1use std::{future::Future, sync::Arc, time::Duration};
2
3use tokio::sync::{RwLock, mpsc::channel};
4use tokio_stream::wrappers::ReceiverStream;
5use tonic::{
6 Status,
7 metadata::{Ascii, MetadataValue},
8 transport::Channel,
9};
10
11use crate::{
12 AuthDisableResponse, AuthEnableResponse, AuthRoleAddRequest, AuthRoleAddResponse,
13 AuthRoleDeleteRequest, AuthRoleDeleteResponse, AuthRoleListResponse, AuthStatusRequest,
14 AuthStatusResponse, AuthenticateRequest, Error, Result,
15 auth::{AuthOp, AuthenticateResponse},
16 cluster::{
17 ClusterOp, MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
18 MemberRemoveRequest, MemberRemoveResponse, MemberUpdateRequest, MemberUpdateResponse,
19 },
20 kv::{
21 CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, KeyRange, KeyValueOp,
22 PutRequest, PutResponse, RangeRequest, RangeResponse, TxnRequest, TxnResponse,
23 },
24 lease::{
25 LeaseGrantRequest, LeaseGrantResponse, LeaseId, LeaseKeepAlive, LeaseOp,
26 LeaseRevokeRequest, LeaseRevokeResponse, LeaseTimeToLiveRequest, LeaseTimeToLiveResponse,
27 },
28 proto::etcdserverpb,
29 proto::etcdserverpb::cluster_client::ClusterClient,
30 proto::etcdserverpb::{
31 auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient,
32 watch_client::WatchClient,
33 },
34 watch::{WatchCanceler, WatchCreateRequest, WatchOp, WatchStream},
35};
36use crate::{
37 auth::{AuthDisableRequest, AuthEnableRequest, AuthRoleListRequest},
38 proto::etcdserverpb::LeaseKeepAliveRequest,
39};
40
41static MAX_RETRY: i32 = 3;
42
43#[derive(Debug, Clone)]
44pub struct Endpoint {
45 url: String,
46 #[cfg(feature = "tls")]
47 tls_opt: Option<tonic::transport::ClientTlsConfig>,
48}
49
50impl Endpoint {
51 pub fn new(url: impl Into<String>) -> Self {
52 Self {
53 url: url.into(),
54 #[cfg(feature = "tls")]
55 tls_opt: None,
56 }
57 }
58
59 #[cfg(feature = "tls")]
60 pub fn tls_raw(
61 mut self,
62 domain_name: impl Into<String>,
63 ca_cert: impl AsRef<[u8]>,
64 client_cert: impl AsRef<[u8]>,
65 client_key: impl AsRef<[u8]>,
66 ) -> Self {
67 use tonic::transport::{Certificate, ClientTlsConfig, Identity};
68
69 let certificate = Certificate::from_pem(ca_cert);
70 let identity = Identity::from_pem(client_cert, client_key);
71
72 self.tls_opt = Some(
73 ClientTlsConfig::new()
74 .domain_name(domain_name)
75 .ca_certificate(certificate)
76 .identity(identity),
77 );
78
79 self
80 }
81
82 #[cfg(feature = "tls")]
83 pub async fn tls(
84 self,
85 domain_name: impl Into<String>,
86 ca_cert_path: impl AsRef<std::path::Path>,
87 client_cert_path: impl AsRef<std::path::Path>,
88 client_key_path: impl AsRef<std::path::Path>,
89 ) -> Result<Self> {
90 use tokio::fs::read;
91
92 let ca_cert = read(ca_cert_path).await?;
93 let client_cert = read(client_cert_path).await?;
94 let client_key = read(client_key_path).await?;
95
96 Ok(self.tls_raw(domain_name, ca_cert, client_cert, client_key))
97 }
98}
99
100impl<T> From<T> for Endpoint
101where
102 T: Into<String>,
103{
104 fn from(url: T) -> Self {
105 Self {
106 url: url.into(),
107 #[cfg(feature = "tls")]
108 tls_opt: None,
109 }
110 }
111}
112
113#[derive(Clone, Debug)]
115pub struct ClientConfig {
116 pub endpoints: Vec<Endpoint>,
117 pub auth: Option<(String, String)>,
118 pub connect_timeout: Duration,
119 pub http2_keep_alive_interval: Duration,
120}
121
122impl ClientConfig {
123 pub fn new(endpoints: impl Into<Vec<Endpoint>>) -> Self {
124 Self {
125 endpoints: endpoints.into(),
126 auth: None,
127 connect_timeout: Duration::from_secs(30),
128 http2_keep_alive_interval: Duration::from_secs(5),
129 }
130 }
131
132 pub fn auth(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
133 self.auth = Some((name.into(), password.into()));
134 self
135 }
136
137 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
138 self.connect_timeout = timeout;
139 self
140 }
141
142 pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
143 self.http2_keep_alive_interval = interval;
144 self
145 }
146}
147
148#[derive(Clone)]
150pub struct Client {
151 auth_client: AuthClient<Channel>,
152 kv_client: KvClient<Channel>,
153 watch_client: WatchClient<Channel>,
154 cluster_client: ClusterClient<Channel>,
155 lease_client: LeaseClient<Channel>,
156 token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
157 auth_user: Option<(String, String)>,
158}
159
160impl AuthOp for Client {
161 async fn authenticate<R>(&self, req: R) -> Result<AuthenticateResponse>
162 where
163 R: Into<AuthenticateRequest>,
164 {
165 let req = tonic::Request::new(req.into().into());
166 let resp = self.auth_client.clone().authenticate(req).await?;
167
168 Ok(resp.into_inner().into())
169 }
170
171 async fn auth_status(&self) -> Result<AuthStatusResponse> {
172 let req = tonic::Request::new(AuthStatusRequest::default().into());
173 let resp = match self.auth_user {
174 Some(_) => {
175 self.execute_with_retries(req, |req| async {
176 self.auth_client.clone().auth_status(req).await
177 })
178 .await?
179 }
180 None => self.auth_client.clone().auth_status(req).await?,
181 };
182
183 Ok(resp.into_inner().into())
184 }
185
186 async fn auth_enable(&self) -> Result<AuthEnableResponse> {
187 let req = tonic::Request::new(AuthEnableRequest::default().into());
188 let resp = match self.auth_user {
189 Some(_) => {
190 self.execute_with_retries(req, |req| async {
191 self.auth_client.clone().auth_enable(req).await
192 })
193 .await?
194 }
195 None => self.auth_client.clone().auth_enable(req).await?,
196 };
197
198 Ok(resp.into_inner().into())
199 }
200
201 async fn auth_disable(&self) -> Result<AuthDisableResponse> {
202 let req = tonic::Request::new(AuthDisableRequest::default().into());
203 let resp = match self.auth_user {
204 Some(_) => {
205 self.execute_with_retries(req, |req| async {
206 self.auth_client.clone().auth_disable(req).await
207 })
208 .await?
209 }
210 None => self.auth_client.clone().auth_disable(req).await?,
211 };
212
213 Ok(resp.into_inner().into())
214 }
215
216 async fn role_add<R>(&self, req: R) -> Result<AuthRoleAddResponse>
217 where
218 R: Into<AuthRoleAddRequest>,
219 {
220 let req = tonic::Request::new(req.into().into());
221 let resp = match self.auth_user {
222 Some(_) => {
223 self.execute_with_retries(req, |req| async {
224 self.auth_client.clone().role_add(req).await
225 })
226 .await?
227 }
228 None => self.auth_client.clone().role_add(req).await?,
229 };
230
231 Ok(resp.into_inner().into())
232 }
233
234 async fn role_delete<R>(&self, req: R) -> Result<AuthRoleDeleteResponse>
235 where
236 R: Into<AuthRoleDeleteRequest>,
237 {
238 let req = tonic::Request::new(req.into().into());
239 let resp = match self.auth_user {
240 Some(_) => {
241 self.execute_with_retries(req, |req| async {
242 self.auth_client.clone().role_delete(req).await
243 })
244 .await?
245 }
246 None => self.auth_client.clone().role_delete(req).await?,
247 };
248
249 Ok(resp.into_inner().into())
250 }
251
252 async fn role_list(&self) -> Result<AuthRoleListResponse> {
253 let req = tonic::Request::new(AuthRoleListRequest::default().into());
254 let resp = match self.auth_user {
255 Some(_) => {
256 self.execute_with_retries(req, |req| async {
257 self.auth_client.clone().role_list(req).await
258 })
259 .await?
260 }
261 None => self.auth_client.clone().role_list(req).await?,
262 };
263
264 Ok(resp.into_inner().into())
265 }
266}
267
268impl Client {
269 async fn new_channel(cfg: &ClientConfig) -> Result<Channel> {
270 let mut endpoints = Vec::with_capacity(cfg.endpoints.len());
271 for e in cfg.endpoints.iter() {
272 #[cfg(not(feature = "tls"))]
273 let c = Channel::from_shared(e.url.clone())?
274 .connect_timeout(cfg.connect_timeout)
275 .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
276
277 #[cfg(feature = "tls")]
278 let mut c = Channel::from_shared(e.url.clone())?
279 .connect_timeout(cfg.connect_timeout)
280 .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
281 #[cfg(feature = "tls")]
282 {
283 if let Some(tls) = e.tls_opt.to_owned() {
284 c = c.tls_config(tls)?;
285 }
286 }
287
288 endpoints.push(c);
289 }
290
291 Ok(Channel::balance_list(endpoints.into_iter()))
292 }
293
294 pub async fn new(cfg: ClientConfig) -> Result<Self> {
299 let channel = Self::new_channel(&cfg).await?;
300
301 let auth_client = AuthClient::new(channel.clone());
302 let kv_client = KvClient::new(channel.clone());
303 let watch_client = WatchClient::new(channel.clone());
304 let cluster_client = ClusterClient::new(channel.clone());
305 let lease_client = LeaseClient::new(channel);
306
307 let mut cli = Self {
308 auth_client,
309 kv_client,
310 watch_client,
311 cluster_client,
312 lease_client,
313 auth_user: None,
314 token: Arc::new(RwLock::new(None)),
315 };
316
317 if let Some((username, password)) = cfg.auth {
318 cli.auth_user = Some((username, password));
319 cli.refresh_token().await.unwrap();
320 };
321
322 Ok(cli)
323 }
324
325 async fn refresh_token(&self) -> Result<()> {
326 if let Some((username, password)) = &self.auth_user {
327 let token = self.authenticate((username, password)).await?.token;
328 let t = match MetadataValue::try_from(&token) {
329 Ok(t) => t,
330 Err(err) => return Err(Error::ParseMetadataToken(err.to_string())),
331 };
332 let mut x = self.token.write().await;
333 *x = Some(t);
334 }
335
336 Ok(())
337 }
338
339 async fn set_token<T>(&self, req: &mut tonic::Request<T>) {
340 let token = self.token.clone();
341 let h = token.read().await;
342 if let Some(token) = h.to_owned() {
343 req.metadata_mut().insert("authorization", token);
344 }
345 }
346
347 async fn execute_with_retries<F, Fut, T, R>(&self, req: tonic::Request<T>, f: F) -> Result<R>
348 where
349 F: Fn(tonic::Request<T>) -> Fut,
350 Fut: Future<Output = std::result::Result<R, Status>>,
351 T: Clone,
352 {
353 for _i in 1..=MAX_RETRY {
354 let mut new_req = tonic::Request::new(req.get_ref().clone());
355 self.set_token(&mut new_req).await;
356
357 match f(new_req).await {
358 Ok(response) => {
359 return Ok(response);
360 }
361 Err(status) => {
362 if status.code() == tonic::Code::Unauthenticated {
363 self.refresh_token().await?;
364 } else {
365 return Err(Error::Response(status));
366 }
367 }
368 }
369 }
370 Err(Error::ExecuteFailed)
371 }
372}
373
374impl KeyValueOp for Client {
375 async fn put<R>(&self, req: R) -> Result<PutResponse>
376 where
377 R: Into<PutRequest>,
378 {
379 let req = tonic::Request::new(req.into().into());
380 let resp = self
381 .execute_with_retries(req, |req| async { self.kv_client.clone().put(req).await })
382 .await?;
383
384 Ok(resp.into_inner().into())
385 }
386
387 async fn get<R>(&self, req: R) -> Result<RangeResponse>
388 where
389 R: Into<RangeRequest>,
390 {
391 let req = tonic::Request::new(req.into().into());
392 let resp = self
393 .execute_with_retries(req, |req| async { self.kv_client.clone().range(req).await })
394 .await?;
395
396 Ok(resp.into_inner().into())
397 }
398
399 async fn get_all(&self) -> Result<RangeResponse> {
400 self.get(KeyRange::all()).await
401 }
402
403 async fn get_by_prefix<K>(&self, p: K) -> Result<RangeResponse>
404 where
405 K: Into<Vec<u8>>,
406 {
407 self.get(KeyRange::prefix(p)).await
408 }
409
410 async fn get_range<F, E>(&self, from: F, end: E) -> Result<RangeResponse>
411 where
412 F: Into<Vec<u8>>,
413 E: Into<Vec<u8>>,
414 {
415 self.get(KeyRange::range(from, end)).await
416 }
417
418 async fn delete<R>(&self, req: R) -> Result<DeleteResponse>
419 where
420 R: Into<DeleteRequest>,
421 {
422 let req = tonic::Request::new(req.into().into());
423 let resp = self
424 .execute_with_retries(req, |req| async {
425 self.kv_client.clone().delete_range(req).await
426 })
427 .await?;
428
429 Ok(resp.into_inner().into())
430 }
431
432 async fn delete_all(&self) -> Result<DeleteResponse> {
433 self.delete(KeyRange::all()).await
434 }
435
436 async fn delete_by_prefix<K>(&self, p: K) -> Result<DeleteResponse>
437 where
438 K: Into<Vec<u8>>,
439 {
440 self.delete(KeyRange::prefix(p)).await
441 }
442
443 async fn delete_range<F, E>(&self, from: F, end: E) -> Result<DeleteResponse>
444 where
445 F: Into<Vec<u8>>,
446 E: Into<Vec<u8>>,
447 {
448 self.delete(KeyRange::range(from, end)).await
449 }
450
451 async fn txn<R>(&self, req: R) -> Result<TxnResponse>
452 where
453 R: Into<TxnRequest>,
454 {
455 let req = tonic::Request::new(req.into().into());
456 let resp = self
457 .execute_with_retries(req, |req| async { self.kv_client.clone().txn(req).await })
458 .await?;
459
460 Ok(resp.into_inner().into())
461 }
462
463 async fn compact<R>(&self, req: R) -> Result<CompactResponse>
464 where
465 R: Into<CompactRequest>,
466 {
467 let req = tonic::Request::new(req.into().into());
468 let resp = self
469 .execute_with_retries(req, |req| async {
470 self.kv_client.clone().compact(req).await
471 })
472 .await?;
473
474 Ok(resp.into_inner().into())
475 }
476}
477
478impl WatchOp for Client {
479 async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
480 where
481 R: Into<WatchCreateRequest>,
482 {
483 let (tx, rx) = channel::<etcdserverpb::WatchRequest>(128);
484
485 tx.send(req.into().into()).await?;
486
487 let mut req = tonic::Request::new(ReceiverStream::new(rx));
488 self.refresh_token().await?;
489 self.set_token(&mut req).await;
490
491 req.metadata_mut()
492 .insert("hasleader", "true".try_into().unwrap());
493
494 let resp = self.watch_client.clone().watch(req).await?;
495
496 let mut inbound = resp.into_inner();
497
498 let watch_id = match inbound.message().await? {
499 Some(resp) => {
500 if !resp.created {
501 return Err(Error::WatchEvent(
502 "should receive created event at first".to_owned(),
503 ));
504 }
505 if resp.canceled {
506 return Err(Error::WatchEvent(resp.cancel_reason));
507 }
508 assert!(resp.events.is_empty(), "received created event: {resp:?}");
509 resp.watch_id
510 }
511
512 None => return Err(Error::CreateWatch),
513 };
514
515 Ok((WatchStream::new(inbound), WatchCanceler::new(watch_id, tx)))
516 }
517}
518
519impl LeaseOp for Client {
520 async fn grant_lease<R>(&self, req: R) -> Result<LeaseGrantResponse>
521 where
522 R: Into<LeaseGrantRequest>,
523 {
524 let req = tonic::Request::new(req.into().into());
525 let resp = self
526 .execute_with_retries(req, |req| async {
527 self.lease_client.clone().lease_grant(req).await
528 })
529 .await?;
530
531 Ok(resp.into_inner().into())
532 }
533
534 async fn revoke<R>(&self, req: R) -> Result<LeaseRevokeResponse>
535 where
536 R: Into<LeaseRevokeRequest>,
537 {
538 let req = tonic::Request::new(req.into().into());
539 let resp = self
540 .execute_with_retries(req, |req| async {
541 self.lease_client.clone().lease_revoke(req).await
542 })
543 .await?;
544
545 Ok(resp.into_inner().into())
546 }
547
548 async fn keep_alive_for(&self, lease_id: LeaseId) -> Result<LeaseKeepAlive> {
549 let (req_tx, req_rx) = channel(1024);
550
551 let req_rx = ReceiverStream::new(req_rx);
552
553 let initial_req = LeaseKeepAliveRequest { id: lease_id };
554
555 req_tx
556 .send(initial_req)
557 .await
558 .map_err(|_| Error::ChannelClosed)?;
559
560 let mut resp_rx = self
561 .lease_client
562 .clone()
563 .lease_keep_alive(req_rx)
564 .await?
565 .into_inner();
566
567 let lease_id = match resp_rx.message().await? {
568 Some(resp) => resp.id,
569 None => {
570 return Err(Error::CreateWatch);
571 }
572 };
573
574 Ok(LeaseKeepAlive::new(lease_id, req_tx, resp_rx))
575 }
576
577 async fn time_to_live<R>(&self, req: R) -> Result<LeaseTimeToLiveResponse>
578 where
579 R: Into<LeaseTimeToLiveRequest>,
580 {
581 let req = tonic::Request::new(req.into().into());
582 let resp = self
583 .execute_with_retries(req, |req| async {
584 self.lease_client.clone().lease_time_to_live(req).await
585 })
586 .await?;
587
588 Ok(resp.into_inner().into())
589 }
590}
591
592impl ClusterOp for Client {
593 async fn member_add<R>(&self, req: R) -> Result<MemberAddResponse>
594 where
595 R: Into<MemberAddRequest>,
596 {
597 let req = tonic::Request::new(req.into().into());
598 let resp = self
599 .execute_with_retries(req, |req| async {
600 self.cluster_client.clone().member_add(req).await
601 })
602 .await?;
603
604 Ok(resp.into_inner().into())
605 }
606
607 async fn member_remove<R>(&self, req: R) -> Result<MemberRemoveResponse>
608 where
609 R: Into<MemberRemoveRequest>,
610 {
611 let req = tonic::Request::new(req.into().into());
612 let resp = self
613 .execute_with_retries(req, |req| async {
614 self.cluster_client.clone().member_remove(req).await
615 })
616 .await?;
617
618 Ok(resp.into_inner().into())
619 }
620
621 async fn member_update<R>(&self, req: R) -> Result<MemberUpdateResponse>
622 where
623 R: Into<MemberUpdateRequest>,
624 {
625 let req = tonic::Request::new(req.into().into());
626 let resp = self
627 .execute_with_retries(req, |req| async {
628 self.cluster_client.clone().member_update(req).await
629 })
630 .await?;
631
632 Ok(resp.into_inner().into())
633 }
634
635 async fn member_list(&self) -> Result<MemberListResponse> {
636 let req = tonic::Request::new(MemberListRequest::new().into());
637 let resp = self
638 .execute_with_retries(req, |req| async {
639 self.cluster_client.clone().member_list(req).await
640 })
641 .await?;
642
643 Ok(resp.into_inner().into())
644 }
645}