etcd_rs/
client.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::sync::mpsc::channel;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::{
7    codegen::InterceptedService,
8    metadata::{Ascii, MetadataValue},
9    service::Interceptor,
10    transport::Channel,
11    Request, Status,
12};
13
14use crate::auth::{AuthOp, AuthenticateRequest, AuthenticateResponse};
15use crate::cluster::{
16    ClusterOp, MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
17    MemberRemoveRequest, MemberRemoveResponse, MemberUpdateRequest, MemberUpdateResponse,
18};
19use crate::kv::{
20    CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, KeyRange, KeyValueOp,
21    PutRequest, PutResponse, RangeRequest, RangeResponse, TxnRequest, TxnResponse,
22};
23use crate::lease::{
24    LeaseGrantRequest, LeaseGrantResponse, LeaseId, LeaseKeepAlive, LeaseOp, LeaseRevokeRequest,
25    LeaseRevokeResponse, LeaseTimeToLiveRequest, LeaseTimeToLiveResponse,
26};
27use crate::proto::etcdserverpb;
28use crate::proto::etcdserverpb::cluster_client::ClusterClient;
29use crate::proto::etcdserverpb::maintenance_client::MaintenanceClient;
30use crate::proto::etcdserverpb::LeaseKeepAliveRequest;
31use crate::proto::etcdserverpb::{
32    auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient,
33    watch_client::WatchClient,
34};
35use crate::watch::{WatchCanceler, WatchCreateRequest, WatchOp, WatchStream};
36use crate::{Error, Result};
37
38#[derive(Clone)]
39pub struct TokenInterceptor {
40    token: Option<MetadataValue<Ascii>>,
41}
42
43impl TokenInterceptor {
44    fn new(token: Option<String>) -> Self {
45        Self {
46            token: token.map(|token: String| MetadataValue::try_from(&token).unwrap()),
47        }
48    }
49}
50
51impl Interceptor for TokenInterceptor {
52    fn call(&mut self, mut req: tonic::Request<()>) -> std::result::Result<Request<()>, Status> {
53        match &self.token {
54            Some(token) => {
55                req.metadata_mut().insert("authorization", token.clone());
56                Ok(req)
57            }
58            None => Ok(req),
59        }
60    }
61}
62
63#[cfg(feature = "tls")]
64#[derive(Debug, Clone)]
65enum TlsOption {
66    None,
67    WithConfig(tonic::transport::ClientTlsConfig),
68}
69
70#[cfg(not(feature = "tls"))]
71#[derive(Debug, Clone)]
72enum TlsOption {
73    None,
74}
75
76#[derive(Debug, Clone)]
77pub struct Endpoint {
78    url: String,
79
80    tls_opt: TlsOption,
81}
82
83impl Endpoint {
84    pub fn new(url: impl Into<String>) -> Self {
85        Self {
86            url: url.into(),
87            tls_opt: TlsOption::None,
88        }
89    }
90
91    #[cfg(feature = "tls")]
92    pub fn tls_raw(
93        mut self,
94        domain_name: impl Into<String>,
95        ca_cert: impl AsRef<[u8]>,
96        client_cert: impl AsRef<[u8]>,
97        client_key: impl AsRef<[u8]>,
98    ) -> Self {
99        use tonic::transport::{Certificate, ClientTlsConfig, Identity};
100
101        let certificate = Certificate::from_pem(ca_cert);
102        let identity = Identity::from_pem(client_cert, client_key);
103
104        self.tls_opt = TlsOption::WithConfig(
105            ClientTlsConfig::new()
106                .domain_name(domain_name)
107                .ca_certificate(certificate)
108                .identity(identity),
109        );
110
111        self
112    }
113
114    #[cfg(feature = "tls")]
115    pub async fn tls(
116        self,
117        domain_name: impl Into<String>,
118        ca_cert_path: impl AsRef<std::path::Path>,
119        client_cert_path: impl AsRef<std::path::Path>,
120        client_key_path: impl AsRef<std::path::Path>,
121    ) -> Result<Self> {
122        use tokio::fs::read;
123
124        let ca_cert = read(ca_cert_path).await?;
125
126        let client_cert = read(client_cert_path).await?;
127        let client_key = read(client_key_path).await?;
128
129        Ok(self.tls_raw(domain_name, ca_cert, client_cert, client_key))
130    }
131}
132
133impl<T> From<T> for Endpoint
134where
135    T: Into<String>,
136{
137    fn from(url: T) -> Self {
138        Self {
139            url: url.into(),
140            tls_opt: TlsOption::None,
141        }
142    }
143}
144
145/// Config for establishing etcd client.
146#[derive(Clone, Debug)]
147pub struct ClientConfig {
148    pub endpoints: Vec<Endpoint>,
149    pub auth: Option<(String, String)>,
150    pub connect_timeout: Duration,
151    pub http2_keep_alive_interval: Duration,
152}
153
154impl ClientConfig {
155    pub fn new(endpoints: impl Into<Vec<Endpoint>>) -> Self {
156        Self {
157            endpoints: endpoints.into(),
158            auth: None,
159            connect_timeout: Duration::from_secs(30),
160            http2_keep_alive_interval: Duration::from_secs(5),
161        }
162    }
163
164    pub fn auth(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
165        self.auth = Some((name.into(), password.into()));
166        self
167    }
168
169    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
170        self.connect_timeout = timeout;
171        self
172    }
173
174    pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
175        self.http2_keep_alive_interval = interval;
176        self
177    }
178}
179
180/// Client is an abstraction for grouping etcd operations and managing underlying network communications.
181#[derive(Clone)]
182pub struct Client {
183    auth_client: AuthClient<InterceptedService<Channel, TokenInterceptor>>,
184    kv_client: KvClient<InterceptedService<Channel, TokenInterceptor>>,
185    watch_client: WatchClient<InterceptedService<Channel, TokenInterceptor>>,
186    cluster_client: ClusterClient<InterceptedService<Channel, TokenInterceptor>>,
187    maintenance_client: MaintenanceClient<InterceptedService<Channel, TokenInterceptor>>,
188    lease_client: LeaseClient<InterceptedService<Channel, TokenInterceptor>>,
189}
190
191impl Client {
192    /// Build clients from tonic [`Channel`] directly.
193    ///
194    /// For advanced users, it provides the ability to control more details about the connection.
195    pub fn with_channel(channel: Channel, token: Option<String>) -> Self {
196        let auth_interceptor = TokenInterceptor::new(token);
197
198        let auth_client = AuthClient::with_interceptor(channel.clone(), auth_interceptor.clone());
199        let kv_client = KvClient::with_interceptor(channel.clone(), auth_interceptor.clone());
200        let watch_client = WatchClient::with_interceptor(channel.clone(), auth_interceptor.clone());
201        let cluster_client =
202            ClusterClient::with_interceptor(channel.clone(), auth_interceptor.clone());
203        let maintenance_client =
204            MaintenanceClient::with_interceptor(channel.clone(), auth_interceptor.clone());
205        let lease_client = LeaseClient::with_interceptor(channel, auth_interceptor);
206
207        Self {
208            auth_client,
209            kv_client,
210            watch_client,
211            cluster_client,
212            maintenance_client,
213            lease_client,
214        }
215    }
216
217    pub async fn connect_with_token(cfg: &ClientConfig, token: Option<String>) -> Result<Self> {
218        let channel = {
219            let mut endpoints = Vec::with_capacity(cfg.endpoints.len());
220            for e in cfg.endpoints.iter() {
221                let mut c = Channel::from_shared(e.url.clone())?
222                    .connect_timeout(cfg.connect_timeout)
223                    .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
224
225                #[cfg(feature = "tls")]
226                {
227                    if let TlsOption::WithConfig(tls) = e.tls_opt.clone() {
228                        c = c.tls_config(tls)?;
229                    }
230                }
231
232                endpoints.push(c);
233            }
234
235            Channel::balance_list(endpoints.into_iter())
236        };
237
238        Ok(Self::with_channel(channel, token))
239    }
240
241    /// Connects to etcd cluster and returns a client.
242    ///
243    /// # Errors
244    /// Will returns `Err` if failed to contact with given endpoints or authentication failed.
245    pub async fn connect(mut cfg: ClientConfig) -> Result<Self> {
246        let cli = Self::connect_with_token(&cfg, None).await?;
247
248        match cfg.auth.take() {
249            Some((name, password)) => {
250                let token = cli.authenticate((name, password)).await?.token;
251
252                Self::connect_with_token(&cfg, Some(token)).await
253            }
254            None => Ok(cli),
255        }
256    }
257}
258
259#[async_trait]
260impl AuthOp for Client {
261    async fn authenticate<R>(&self, req: R) -> Result<AuthenticateResponse>
262    where
263        R: Into<AuthenticateRequest> + Send,
264    {
265        let req = tonic::Request::new(req.into().into());
266        let resp = self.auth_client.clone().authenticate(req).await?;
267
268        Ok(resp.into_inner().into())
269    }
270}
271
272#[async_trait]
273impl KeyValueOp for Client {
274    async fn put<R>(&self, req: R) -> Result<PutResponse>
275    where
276        R: Into<PutRequest> + Send,
277    {
278        let req = tonic::Request::new(req.into().into());
279        let resp = self.kv_client.clone().put(req).await?;
280
281        Ok(resp.into_inner().into())
282    }
283
284    async fn get<R>(&self, req: R) -> Result<RangeResponse>
285    where
286        R: Into<RangeRequest> + Send,
287    {
288        let req = tonic::Request::new(req.into().into());
289        let resp = self.kv_client.clone().range(req).await?;
290
291        Ok(resp.into_inner().into())
292    }
293
294    async fn get_all(&self) -> Result<RangeResponse> {
295        self.get(KeyRange::all()).await
296    }
297
298    async fn get_by_prefix<K>(&self, p: K) -> Result<RangeResponse>
299    where
300        K: Into<Vec<u8>> + Send,
301    {
302        self.get(KeyRange::prefix(p)).await
303    }
304
305    async fn get_range<F, E>(&self, from: F, end: E) -> Result<RangeResponse>
306    where
307        F: Into<Vec<u8>> + Send,
308        E: Into<Vec<u8>> + Send,
309    {
310        self.get(KeyRange::range(from, end)).await
311    }
312
313    async fn delete<R>(&self, req: R) -> Result<DeleteResponse>
314    where
315        R: Into<DeleteRequest> + Send,
316    {
317        let req = tonic::Request::new(req.into().into());
318        let resp = self.kv_client.clone().delete_range(req).await?;
319
320        Ok(resp.into_inner().into())
321    }
322
323    async fn delete_all(&self) -> Result<DeleteResponse> {
324        self.delete(KeyRange::all()).await
325    }
326
327    async fn delete_by_prefix<K>(&self, p: K) -> Result<DeleteResponse>
328    where
329        K: Into<Vec<u8>> + Send,
330    {
331        self.delete(KeyRange::prefix(p)).await
332    }
333
334    async fn delete_range<F, E>(&self, from: F, end: E) -> Result<DeleteResponse>
335    where
336        F: Into<Vec<u8>> + Send,
337        E: Into<Vec<u8>> + Send,
338    {
339        self.delete(KeyRange::range(from, end)).await
340    }
341
342    async fn txn<R>(&self, req: R) -> Result<TxnResponse>
343    where
344        R: Into<TxnRequest> + Send,
345    {
346        let req = tonic::Request::new(req.into().into());
347        let resp = self.kv_client.clone().txn(req).await?;
348
349        Ok(resp.into_inner().into())
350    }
351
352    async fn compact<R>(&self, req: R) -> Result<CompactResponse>
353    where
354        R: Into<CompactRequest> + Send,
355    {
356        let req = tonic::Request::new(req.into().into());
357        let resp = self.kv_client.clone().compact(req).await?;
358
359        Ok(resp.into_inner().into())
360    }
361}
362
363#[async_trait]
364impl WatchOp for Client {
365    async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
366    where
367        R: Into<WatchCreateRequest> + Send,
368    {
369        let (tx, rx) = channel::<etcdserverpb::WatchRequest>(128);
370
371        tx.send(req.into().into()).await?;
372
373        let mut req = tonic::Request::new(ReceiverStream::new(rx));
374
375        req.metadata_mut()
376            .insert("hasleader", "true".try_into().unwrap());
377
378        let resp = self.watch_client.clone().watch(req).await?;
379
380        let mut inbound = resp.into_inner();
381
382        let watch_id = match inbound.message().await? {
383            Some(resp) => {
384                if !resp.created {
385                    return Err(Error::WatchEvent(
386                        "should receive created event at first".to_owned(),
387                    ));
388                }
389                assert!(resp.events.is_empty(), "received created event {:?}", resp);
390                resp.watch_id
391            }
392
393            None => return Err(Error::CreateWatch),
394        };
395
396        Ok((WatchStream::new(inbound), WatchCanceler::new(watch_id, tx)))
397    }
398}
399
400#[async_trait]
401impl LeaseOp for Client {
402    async fn grant_lease<R>(&self, req: R) -> Result<LeaseGrantResponse>
403    where
404        R: Into<LeaseGrantRequest> + Send,
405    {
406        let req = tonic::Request::new(req.into().into());
407        let resp = self.lease_client.clone().lease_grant(req).await?;
408        Ok(resp.into_inner().into())
409    }
410
411    async fn revoke<R>(&self, req: R) -> Result<LeaseRevokeResponse>
412    where
413        R: Into<LeaseRevokeRequest> + Send,
414    {
415        let req = tonic::Request::new(req.into().into());
416        let resp = self.lease_client.clone().lease_revoke(req).await?;
417        Ok(resp.into_inner().into())
418    }
419
420    async fn keep_alive_for(&self, lease_id: LeaseId) -> Result<LeaseKeepAlive> {
421        let (req_tx, req_rx) = channel(1024);
422
423        let req_rx = ReceiverStream::new(req_rx);
424
425        let initial_req = LeaseKeepAliveRequest { id: lease_id };
426
427        req_tx
428            .send(initial_req)
429            .await
430            .map_err(|_| Error::ChannelClosed)?;
431
432        let mut resp_rx = self
433            .lease_client
434            .clone()
435            .lease_keep_alive(req_rx)
436            .await?
437            .into_inner();
438
439        let lease_id = match resp_rx.message().await? {
440            Some(resp) => resp.id,
441            None => {
442                return Err(Error::CreateWatch);
443            }
444        };
445
446        Ok(LeaseKeepAlive::new(lease_id, req_tx, resp_rx))
447    }
448
449    async fn time_to_live<R>(&self, req: R) -> Result<LeaseTimeToLiveResponse>
450    where
451        R: Into<LeaseTimeToLiveRequest> + Send,
452    {
453        let req = tonic::Request::new(req.into().into());
454        let resp = self.lease_client.clone().lease_time_to_live(req).await?;
455        Ok(resp.into_inner().into())
456    }
457}
458
459#[async_trait]
460impl ClusterOp for Client {
461    async fn member_add<R>(&self, req: R) -> Result<MemberAddResponse>
462    where
463        R: Into<MemberAddRequest> + Send,
464    {
465        let req = tonic::Request::new(req.into().into());
466        let resp = self.cluster_client.clone().member_add(req).await?;
467
468        Ok(resp.into_inner().into())
469    }
470
471    async fn member_remove<R>(&self, req: R) -> Result<MemberRemoveResponse>
472    where
473        R: Into<MemberRemoveRequest> + Send,
474    {
475        let req = tonic::Request::new(req.into().into());
476        let resp = self.cluster_client.clone().member_remove(req).await?;
477
478        Ok(resp.into_inner().into())
479    }
480
481    async fn member_update<R>(&self, req: R) -> Result<MemberUpdateResponse>
482    where
483        R: Into<MemberUpdateRequest> + Send,
484    {
485        let req = tonic::Request::new(req.into().into());
486        let resp = self.cluster_client.clone().member_update(req).await?;
487
488        Ok(resp.into_inner().into())
489    }
490
491    async fn member_list(&self) -> Result<MemberListResponse> {
492        let req = tonic::Request::new(MemberListRequest::new().into());
493        let resp = self.cluster_client.clone().member_list(req).await?;
494
495        Ok(resp.into_inner().into())
496    }
497}