ya_etcd_rs/
client.rs

1use std::{future::Future, sync::Arc, time::Duration};
2
3use tokio::sync::{RwLock, mpsc::channel};
4use tokio_stream::wrappers::ReceiverStream;
5use tonic::{
6    Status,
7    metadata::{Ascii, MetadataValue},
8    transport::Channel,
9};
10
11use crate::{
12    AuthDisableResponse, AuthEnableResponse, AuthRoleAddRequest, AuthRoleAddResponse,
13    AuthRoleDeleteRequest, AuthRoleDeleteResponse, AuthRoleListResponse, AuthStatusRequest,
14    AuthStatusResponse, AuthenticateRequest, Error, Result,
15    auth::{AuthOp, AuthenticateResponse},
16    cluster::{
17        ClusterOp, MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
18        MemberRemoveRequest, MemberRemoveResponse, MemberUpdateRequest, MemberUpdateResponse,
19    },
20    kv::{
21        CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, KeyRange, KeyValueOp,
22        PutRequest, PutResponse, RangeRequest, RangeResponse, TxnRequest, TxnResponse,
23    },
24    lease::{
25        LeaseGrantRequest, LeaseGrantResponse, LeaseId, LeaseKeepAlive, LeaseOp,
26        LeaseRevokeRequest, LeaseRevokeResponse, LeaseTimeToLiveRequest, LeaseTimeToLiveResponse,
27    },
28    proto::etcdserverpb,
29    proto::etcdserverpb::cluster_client::ClusterClient,
30    proto::etcdserverpb::{
31        auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient,
32        watch_client::WatchClient,
33    },
34    watch::{WatchCanceler, WatchCreateRequest, WatchOp, WatchStream},
35};
36use crate::{
37    auth::{AuthDisableRequest, AuthEnableRequest, AuthRoleListRequest},
38    proto::etcdserverpb::LeaseKeepAliveRequest,
39};
40
41static MAX_RETRY: i32 = 3;
42
43#[derive(Debug, Clone)]
44pub struct Endpoint {
45    url: String,
46    #[cfg(feature = "tls")]
47    tls_opt: Option<tonic::transport::ClientTlsConfig>,
48}
49
50impl Endpoint {
51    pub fn new(url: impl Into<String>) -> Self {
52        Self {
53            url: url.into(),
54            #[cfg(feature = "tls")]
55            tls_opt: None,
56        }
57    }
58
59    #[cfg(feature = "tls")]
60    pub fn tls_raw(
61        mut self,
62        domain_name: impl Into<String>,
63        ca_cert: impl AsRef<[u8]>,
64        client_cert: impl AsRef<[u8]>,
65        client_key: impl AsRef<[u8]>,
66    ) -> Self {
67        use tonic::transport::{Certificate, ClientTlsConfig, Identity};
68
69        let certificate = Certificate::from_pem(ca_cert);
70        let identity = Identity::from_pem(client_cert, client_key);
71
72        self.tls_opt = Some(
73            ClientTlsConfig::new()
74                .domain_name(domain_name)
75                .ca_certificate(certificate)
76                .identity(identity),
77        );
78
79        self
80    }
81
82    #[cfg(feature = "tls")]
83    pub async fn tls(
84        self,
85        domain_name: impl Into<String>,
86        ca_cert_path: impl AsRef<std::path::Path>,
87        client_cert_path: impl AsRef<std::path::Path>,
88        client_key_path: impl AsRef<std::path::Path>,
89    ) -> Result<Self> {
90        use tokio::fs::read;
91
92        let ca_cert = read(ca_cert_path).await?;
93        let client_cert = read(client_cert_path).await?;
94        let client_key = read(client_key_path).await?;
95
96        Ok(self.tls_raw(domain_name, ca_cert, client_cert, client_key))
97    }
98}
99
100impl<T> From<T> for Endpoint
101where
102    T: Into<String>,
103{
104    fn from(url: T) -> Self {
105        Self {
106            url: url.into(),
107            #[cfg(feature = "tls")]
108            tls_opt: None,
109        }
110    }
111}
112
113/// Config for establishing etcd client.
114#[derive(Clone, Debug)]
115pub struct ClientConfig {
116    pub endpoints: Vec<Endpoint>,
117    pub auth: Option<(String, String)>,
118    pub connect_timeout: Duration,
119    pub http2_keep_alive_interval: Duration,
120}
121
122impl ClientConfig {
123    pub fn new(endpoints: impl Into<Vec<Endpoint>>) -> Self {
124        Self {
125            endpoints: endpoints.into(),
126            auth: None,
127            connect_timeout: Duration::from_secs(30),
128            http2_keep_alive_interval: Duration::from_secs(5),
129        }
130    }
131
132    pub fn auth(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
133        self.auth = Some((name.into(), password.into()));
134        self
135    }
136
137    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
138        self.connect_timeout = timeout;
139        self
140    }
141
142    pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
143        self.http2_keep_alive_interval = interval;
144        self
145    }
146}
147
148/// Client is an abstraction for grouping etcd operations and managing underlying network communications.
149#[derive(Clone)]
150pub struct Client {
151    auth_client: AuthClient<Channel>,
152    kv_client: KvClient<Channel>,
153    watch_client: WatchClient<Channel>,
154    cluster_client: ClusterClient<Channel>,
155    lease_client: LeaseClient<Channel>,
156    token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
157    auth_user: Option<(String, String)>,
158}
159
160impl AuthOp for Client {
161    async fn authenticate<R>(&self, req: R) -> Result<AuthenticateResponse>
162    where
163        R: Into<AuthenticateRequest>,
164    {
165        let req = tonic::Request::new(req.into().into());
166        let resp = self.auth_client.clone().authenticate(req).await?;
167
168        Ok(resp.into_inner().into())
169    }
170
171    async fn auth_status(&self) -> Result<AuthStatusResponse> {
172        let req = tonic::Request::new(AuthStatusRequest::default().into());
173        let resp = match self.auth_user {
174            Some(_) => {
175                self.execute_with_retries(req, |req| async {
176                    self.auth_client.clone().auth_status(req).await
177                })
178                .await?
179            }
180            None => self.auth_client.clone().auth_status(req).await?,
181        };
182
183        Ok(resp.into_inner().into())
184    }
185
186    async fn auth_enable(&self) -> Result<AuthEnableResponse> {
187        let req = tonic::Request::new(AuthEnableRequest::default().into());
188        let resp = match self.auth_user {
189            Some(_) => {
190                self.execute_with_retries(req, |req| async {
191                    self.auth_client.clone().auth_enable(req).await
192                })
193                .await?
194            }
195            None => self.auth_client.clone().auth_enable(req).await?,
196        };
197
198        Ok(resp.into_inner().into())
199    }
200
201    async fn auth_disable(&self) -> Result<AuthDisableResponse> {
202        let req = tonic::Request::new(AuthDisableRequest::default().into());
203        let resp = match self.auth_user {
204            Some(_) => {
205                self.execute_with_retries(req, |req| async {
206                    self.auth_client.clone().auth_disable(req).await
207                })
208                .await?
209            }
210            None => self.auth_client.clone().auth_disable(req).await?,
211        };
212
213        Ok(resp.into_inner().into())
214    }
215
216    async fn role_add<R>(&self, req: R) -> Result<AuthRoleAddResponse>
217    where
218        R: Into<AuthRoleAddRequest>,
219    {
220        let req = tonic::Request::new(req.into().into());
221        let resp = match self.auth_user {
222            Some(_) => {
223                self.execute_with_retries(req, |req| async {
224                    self.auth_client.clone().role_add(req).await
225                })
226                .await?
227            }
228            None => self.auth_client.clone().role_add(req).await?,
229        };
230
231        Ok(resp.into_inner().into())
232    }
233
234    async fn role_delete<R>(&self, req: R) -> Result<AuthRoleDeleteResponse>
235    where
236        R: Into<AuthRoleDeleteRequest>,
237    {
238        let req = tonic::Request::new(req.into().into());
239        let resp = match self.auth_user {
240            Some(_) => {
241                self.execute_with_retries(req, |req| async {
242                    self.auth_client.clone().role_delete(req).await
243                })
244                .await?
245            }
246            None => self.auth_client.clone().role_delete(req).await?,
247        };
248
249        Ok(resp.into_inner().into())
250    }
251
252    async fn role_list(&self) -> Result<AuthRoleListResponse> {
253        let req = tonic::Request::new(AuthRoleListRequest::default().into());
254        let resp = match self.auth_user {
255            Some(_) => {
256                self.execute_with_retries(req, |req| async {
257                    self.auth_client.clone().role_list(req).await
258                })
259                .await?
260            }
261            None => self.auth_client.clone().role_list(req).await?,
262        };
263
264        Ok(resp.into_inner().into())
265    }
266}
267
268impl Client {
269    async fn new_channel(cfg: &ClientConfig) -> Result<Channel> {
270        let mut endpoints = Vec::with_capacity(cfg.endpoints.len());
271        for e in cfg.endpoints.iter() {
272            #[cfg(not(feature = "tls"))]
273            let c = Channel::from_shared(e.url.clone())?
274                .connect_timeout(cfg.connect_timeout)
275                .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
276
277            #[cfg(feature = "tls")]
278            let mut c = Channel::from_shared(e.url.clone())?
279                .connect_timeout(cfg.connect_timeout)
280                .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
281            #[cfg(feature = "tls")]
282            {
283                if let Some(tls) = e.tls_opt.to_owned() {
284                    c = c.tls_config(tls)?;
285                }
286            }
287
288            endpoints.push(c);
289        }
290
291        Ok(Channel::balance_list(endpoints.into_iter()))
292    }
293
294    /// new connect to etcd cluster and returns a client.
295    ///
296    /// # Errors
297    /// Will returns `Err` if failed to contact with given endpoints or authentication failed.
298    pub async fn new(cfg: ClientConfig) -> Result<Self> {
299        let channel = Self::new_channel(&cfg).await?;
300
301        let auth_client = AuthClient::new(channel.clone());
302        let kv_client = KvClient::new(channel.clone());
303        let watch_client = WatchClient::new(channel.clone());
304        let cluster_client = ClusterClient::new(channel.clone());
305        let lease_client = LeaseClient::new(channel);
306
307        let mut cli = Self {
308            auth_client,
309            kv_client,
310            watch_client,
311            cluster_client,
312            lease_client,
313            auth_user: None,
314            token: Arc::new(RwLock::new(None)),
315        };
316
317        if let Some((username, password)) = cfg.auth {
318            cli.auth_user = Some((username, password));
319            cli.refresh_token().await.unwrap();
320        };
321
322        Ok(cli)
323    }
324
325    async fn refresh_token(&self) -> Result<()> {
326        if let Some((username, password)) = &self.auth_user {
327            let token = self.authenticate((username, password)).await?.token;
328            let t = match MetadataValue::try_from(&token) {
329                Ok(t) => t,
330                Err(err) => return Err(Error::ParseMetadataToken(err.to_string())),
331            };
332            let mut x = self.token.write().await;
333            *x = Some(t);
334        }
335
336        Ok(())
337    }
338
339    async fn set_token<T>(&self, req: &mut tonic::Request<T>) {
340        let token = self.token.clone();
341        let h = token.read().await;
342        if let Some(token) = h.to_owned() {
343            req.metadata_mut().insert("authorization", token);
344        }
345    }
346
347    async fn execute_with_retries<F, Fut, T, R>(&self, req: tonic::Request<T>, f: F) -> Result<R>
348    where
349        F: Fn(tonic::Request<T>) -> Fut,
350        Fut: Future<Output = std::result::Result<R, Status>>,
351        T: Clone,
352    {
353        for _i in 1..=MAX_RETRY {
354            let mut new_req = tonic::Request::new(req.get_ref().clone());
355            self.set_token(&mut new_req).await;
356
357            match f(new_req).await {
358                Ok(response) => {
359                    return Ok(response);
360                }
361                Err(status) => {
362                    if status.code() == tonic::Code::Unauthenticated {
363                        self.refresh_token().await?;
364                    } else {
365                        return Err(Error::Response(status));
366                    }
367                }
368            }
369        }
370        Err(Error::ExecuteFailed)
371    }
372}
373
374impl KeyValueOp for Client {
375    async fn put<R>(&self, req: R) -> Result<PutResponse>
376    where
377        R: Into<PutRequest>,
378    {
379        let req = tonic::Request::new(req.into().into());
380        let resp = self
381            .execute_with_retries(req, |req| async { self.kv_client.clone().put(req).await })
382            .await?;
383
384        Ok(resp.into_inner().into())
385    }
386
387    async fn get<R>(&self, req: R) -> Result<RangeResponse>
388    where
389        R: Into<RangeRequest>,
390    {
391        let req = tonic::Request::new(req.into().into());
392        let resp = self
393            .execute_with_retries(req, |req| async { self.kv_client.clone().range(req).await })
394            .await?;
395
396        Ok(resp.into_inner().into())
397    }
398
399    async fn get_all(&self) -> Result<RangeResponse> {
400        self.get(KeyRange::all()).await
401    }
402
403    async fn get_by_prefix<K>(&self, p: K) -> Result<RangeResponse>
404    where
405        K: Into<Vec<u8>>,
406    {
407        self.get(KeyRange::prefix(p)).await
408    }
409
410    async fn get_range<F, E>(&self, from: F, end: E) -> Result<RangeResponse>
411    where
412        F: Into<Vec<u8>>,
413        E: Into<Vec<u8>>,
414    {
415        self.get(KeyRange::range(from, end)).await
416    }
417
418    async fn delete<R>(&self, req: R) -> Result<DeleteResponse>
419    where
420        R: Into<DeleteRequest>,
421    {
422        let req = tonic::Request::new(req.into().into());
423        let resp = self
424            .execute_with_retries(req, |req| async {
425                self.kv_client.clone().delete_range(req).await
426            })
427            .await?;
428
429        Ok(resp.into_inner().into())
430    }
431
432    async fn delete_all(&self) -> Result<DeleteResponse> {
433        self.delete(KeyRange::all()).await
434    }
435
436    async fn delete_by_prefix<K>(&self, p: K) -> Result<DeleteResponse>
437    where
438        K: Into<Vec<u8>>,
439    {
440        self.delete(KeyRange::prefix(p)).await
441    }
442
443    async fn delete_range<F, E>(&self, from: F, end: E) -> Result<DeleteResponse>
444    where
445        F: Into<Vec<u8>>,
446        E: Into<Vec<u8>>,
447    {
448        self.delete(KeyRange::range(from, end)).await
449    }
450
451    async fn txn<R>(&self, req: R) -> Result<TxnResponse>
452    where
453        R: Into<TxnRequest>,
454    {
455        let req = tonic::Request::new(req.into().into());
456        let resp = self
457            .execute_with_retries(req, |req| async { self.kv_client.clone().txn(req).await })
458            .await?;
459
460        Ok(resp.into_inner().into())
461    }
462
463    async fn compact<R>(&self, req: R) -> Result<CompactResponse>
464    where
465        R: Into<CompactRequest>,
466    {
467        let req = tonic::Request::new(req.into().into());
468        let resp = self
469            .execute_with_retries(req, |req| async {
470                self.kv_client.clone().compact(req).await
471            })
472            .await?;
473
474        Ok(resp.into_inner().into())
475    }
476}
477
478impl WatchOp for Client {
479    async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
480    where
481        R: Into<WatchCreateRequest>,
482    {
483        let (tx, rx) = channel::<etcdserverpb::WatchRequest>(128);
484
485        tx.send(req.into().into()).await?;
486
487        let mut req = tonic::Request::new(ReceiverStream::new(rx));
488        self.refresh_token().await?;
489        self.set_token(&mut req).await;
490
491        req.metadata_mut()
492            .insert("hasleader", "true".try_into().unwrap());
493
494        let resp = self.watch_client.clone().watch(req).await?;
495
496        let mut inbound = resp.into_inner();
497
498        let watch_id = match inbound.message().await? {
499            Some(resp) => {
500                if !resp.created {
501                    return Err(Error::WatchEvent(
502                        "should receive created event at first".to_owned(),
503                    ));
504                }
505                if resp.canceled {
506                    return Err(Error::WatchEvent(resp.cancel_reason));
507                }
508                assert!(resp.events.is_empty(), "received created event: {resp:?}");
509                resp.watch_id
510            }
511
512            None => return Err(Error::CreateWatch),
513        };
514
515        Ok((WatchStream::new(inbound), WatchCanceler::new(watch_id, tx)))
516    }
517}
518
519impl LeaseOp for Client {
520    async fn grant_lease<R>(&self, req: R) -> Result<LeaseGrantResponse>
521    where
522        R: Into<LeaseGrantRequest>,
523    {
524        let req = tonic::Request::new(req.into().into());
525        let resp = self
526            .execute_with_retries(req, |req| async {
527                self.lease_client.clone().lease_grant(req).await
528            })
529            .await?;
530
531        Ok(resp.into_inner().into())
532    }
533
534    async fn revoke<R>(&self, req: R) -> Result<LeaseRevokeResponse>
535    where
536        R: Into<LeaseRevokeRequest>,
537    {
538        let req = tonic::Request::new(req.into().into());
539        let resp = self
540            .execute_with_retries(req, |req| async {
541                self.lease_client.clone().lease_revoke(req).await
542            })
543            .await?;
544
545        Ok(resp.into_inner().into())
546    }
547
548    async fn keep_alive_for(&self, lease_id: LeaseId) -> Result<LeaseKeepAlive> {
549        let (req_tx, req_rx) = channel(1024);
550
551        let req_rx = ReceiverStream::new(req_rx);
552
553        let initial_req = LeaseKeepAliveRequest { id: lease_id };
554
555        req_tx
556            .send(initial_req)
557            .await
558            .map_err(|_| Error::ChannelClosed)?;
559
560        let mut resp_rx = self
561            .lease_client
562            .clone()
563            .lease_keep_alive(req_rx)
564            .await?
565            .into_inner();
566
567        let lease_id = match resp_rx.message().await? {
568            Some(resp) => resp.id,
569            None => {
570                return Err(Error::CreateWatch);
571            }
572        };
573
574        Ok(LeaseKeepAlive::new(lease_id, req_tx, resp_rx))
575    }
576
577    async fn time_to_live<R>(&self, req: R) -> Result<LeaseTimeToLiveResponse>
578    where
579        R: Into<LeaseTimeToLiveRequest>,
580    {
581        let req = tonic::Request::new(req.into().into());
582        let resp = self
583            .execute_with_retries(req, |req| async {
584                self.lease_client.clone().lease_time_to_live(req).await
585            })
586            .await?;
587
588        Ok(resp.into_inner().into())
589    }
590}
591
592impl ClusterOp for Client {
593    async fn member_add<R>(&self, req: R) -> Result<MemberAddResponse>
594    where
595        R: Into<MemberAddRequest>,
596    {
597        let req = tonic::Request::new(req.into().into());
598        let resp = self
599            .execute_with_retries(req, |req| async {
600                self.cluster_client.clone().member_add(req).await
601            })
602            .await?;
603
604        Ok(resp.into_inner().into())
605    }
606
607    async fn member_remove<R>(&self, req: R) -> Result<MemberRemoveResponse>
608    where
609        R: Into<MemberRemoveRequest>,
610    {
611        let req = tonic::Request::new(req.into().into());
612        let resp = self
613            .execute_with_retries(req, |req| async {
614                self.cluster_client.clone().member_remove(req).await
615            })
616            .await?;
617
618        Ok(resp.into_inner().into())
619    }
620
621    async fn member_update<R>(&self, req: R) -> Result<MemberUpdateResponse>
622    where
623        R: Into<MemberUpdateRequest>,
624    {
625        let req = tonic::Request::new(req.into().into());
626        let resp = self
627            .execute_with_retries(req, |req| async {
628                self.cluster_client.clone().member_update(req).await
629            })
630            .await?;
631
632        Ok(resp.into_inner().into())
633    }
634
635    async fn member_list(&self) -> Result<MemberListResponse> {
636        let req = tonic::Request::new(MemberListRequest::new().into());
637        let resp = self
638            .execute_with_retries(req, |req| async {
639                self.cluster_client.clone().member_list(req).await
640            })
641            .await?;
642
643        Ok(resp.into_inner().into())
644    }
645}