inve_etcd/
client.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use tokio::sync::mpsc::channel;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::{
7    codegen::InterceptedService,
8    metadata::{Ascii, MetadataValue},
9    service::Interceptor,
10    transport::Channel,
11    Request, Status,
12};
13
14use crate::auth::{AuthOp, AuthenticateRequest, AuthenticateResponse};
15use crate::cluster::{
16    ClusterOp, MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
17    MemberRemoveRequest, MemberRemoveResponse, MemberUpdateRequest, MemberUpdateResponse,
18};
19use crate::kv::{
20    CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, KeyRange, KeyValueOp,
21    PutRequest, PutResponse, RangeRequest, RangeResponse, TxnRequest, TxnResponse,
22};
23use crate::lease::{
24    LeaseGrantRequest, LeaseGrantResponse, LeaseId, LeaseKeepAlive, LeaseOp, LeaseRevokeRequest,
25    LeaseRevokeResponse, LeaseTimeToLiveRequest, LeaseTimeToLiveResponse,
26};
27use crate::proto::etcdserverpb;
28use crate::proto::etcdserverpb::cluster_client::ClusterClient;
29use crate::proto::etcdserverpb::maintenance_client::MaintenanceClient;
30use crate::proto::etcdserverpb::{
31    auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient,
32    watch_client::WatchClient,
33};
34use crate::watch::{WatchCanceler, WatchCreateRequest, WatchOp, WatchStream};
35use crate::{Error, Result};
36
37#[derive(Clone)]
38pub struct TokenInterceptor {
39    token: Option<MetadataValue<Ascii>>,
40}
41
42impl TokenInterceptor {
43    fn new(token: Option<String>) -> Self {
44        Self {
45            token: token.map(|token: String| MetadataValue::from_str(&token).unwrap()),
46        }
47    }
48}
49
50impl Interceptor for TokenInterceptor {
51    fn call(&mut self, mut req: tonic::Request<()>) -> std::result::Result<Request<()>, Status> {
52        match &self.token {
53            Some(token) => {
54                req.metadata_mut().insert("authorization", token.clone());
55                Ok(req)
56            }
57            None => Ok(req),
58        }
59    }
60}
61
62#[cfg(feature = "tls")]
63#[derive(Debug, Clone)]
64enum TlsOption {
65    None,
66    WithConfig(tonic::transport::ClientTlsConfig),
67}
68
69#[cfg(not(feature = "tls"))]
70#[derive(Debug, Clone)]
71enum TlsOption {
72    None,
73}
74
75#[derive(Debug, Clone)]
76pub struct Endpoint {
77    url: String,
78
79    tls_opt: TlsOption,
80}
81
82impl Endpoint {
83    pub fn new(url: impl Into<String>) -> Self {
84        Self {
85            url: url.into(),
86            tls_opt: TlsOption::None,
87        }
88    }
89
90    #[cfg(feature = "tls")]
91    pub fn tls_raw(
92        mut self,
93        domain_name: impl Into<String>,
94        ca_cert: impl AsRef<[u8]>,
95        client_cert: impl AsRef<[u8]>,
96        client_key: impl AsRef<[u8]>,
97    ) -> Self {
98        use tonic::transport::{Certificate, ClientTlsConfig, Identity};
99
100        let certificate = Certificate::from_pem(ca_cert);
101        let identity = Identity::from_pem(client_cert, client_key);
102
103        self.tls_opt = TlsOption::WithConfig(
104            ClientTlsConfig::new()
105                .domain_name(domain_name)
106                .ca_certificate(certificate)
107                .identity(identity),
108        );
109
110        self
111    }
112
113    #[cfg(feature = "tls")]
114    pub async fn tls(
115        self,
116        domain_name: impl Into<String>,
117        ca_cert_path: impl AsRef<std::path::Path>,
118        client_cert_path: impl AsRef<std::path::Path>,
119        client_key_path: impl AsRef<std::path::Path>,
120    ) -> Result<Self> {
121        use tokio::fs::read;
122
123        let ca_cert = read(ca_cert_path).await?;
124
125        let client_cert = read(client_cert_path).await?;
126        let client_key = read(client_key_path).await?;
127
128        Ok(self.tls_raw(domain_name, ca_cert, client_cert, client_key))
129    }
130}
131
132impl<T> From<T> for Endpoint
133where
134    T: Into<String>,
135{
136    fn from(url: T) -> Self {
137        Self {
138            url: url.into(),
139            tls_opt: TlsOption::None,
140        }
141    }
142}
143
144#[derive(Clone, Debug)]
145pub struct ClientConfig {
146    pub endpoints: Vec<Endpoint>,
147    pub auth: Option<(String, String)>,
148    pub connect_timeout: Duration,
149    pub http2_keep_alive_interval: Duration,
150}
151
152impl ClientConfig {
153    pub fn new(endpoints: impl Into<Vec<Endpoint>>) -> Self {
154        Self {
155            endpoints: endpoints.into(),
156            auth: None,
157            connect_timeout: Duration::from_secs(30),
158            http2_keep_alive_interval: Duration::from_secs(5),
159        }
160    }
161
162    pub fn user_auth(&mut self, name: impl Into<String>, password: impl Into<String>) {
163        self.auth = Some((name.into(), password.into()));
164    }
165
166    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
167        self.connect_timeout = timeout;
168        self
169    }
170
171    pub fn http2_keep_alive_interval(mut self, interval: Duration) -> Self {
172        self.http2_keep_alive_interval = interval;
173        self
174    }
175}
176
177#[derive(Clone)]
178pub struct Client {
179    pub auth_client: AuthClient<InterceptedService<Channel, TokenInterceptor>>,
180    pub kv_client: KvClient<InterceptedService<Channel, TokenInterceptor>>,
181    pub watch_client: WatchClient<InterceptedService<Channel, TokenInterceptor>>,
182    pub cluster_client: ClusterClient<InterceptedService<Channel, TokenInterceptor>>,
183    pub maintenance_client: MaintenanceClient<InterceptedService<Channel, TokenInterceptor>>,
184    pub lease_client: LeaseClient<InterceptedService<Channel, TokenInterceptor>>,
185}
186
187impl Client {
188    pub async fn connect_with_token(cfg: &ClientConfig, token: Option<String>) -> Result<Self> {
189        let auth_interceptor = TokenInterceptor::new(token);
190
191        let channel = {
192            let mut endpoints = Vec::with_capacity(cfg.endpoints.len());
193            for e in cfg.endpoints.iter() {
194                let mut c = Channel::from_shared(e.url.clone())?
195                    .connect_timeout(cfg.connect_timeout)
196                    .http2_keep_alive_interval(cfg.http2_keep_alive_interval);
197
198                #[cfg(feature = "tls")]
199                {
200                    if let TlsOption::WithConfig(tls) = e.tls_opt.clone() {
201                        c = c.tls_config(tls)?;
202                    }
203                }
204
205                endpoints.push(c);
206            }
207
208            Channel::balance_list(endpoints.into_iter())
209        };
210
211        let auth_client = AuthClient::with_interceptor(channel.clone(), auth_interceptor.clone());
212        let kv_client = KvClient::with_interceptor(channel.clone(), auth_interceptor.clone());
213        let watch_client = WatchClient::with_interceptor(channel.clone(), auth_interceptor.clone());
214        let cluster_client =
215            ClusterClient::with_interceptor(channel.clone(), auth_interceptor.clone());
216        let maintenance_client =
217            MaintenanceClient::with_interceptor(channel.clone(), auth_interceptor.clone());
218        let lease_client = LeaseClient::with_interceptor(channel, auth_interceptor);
219
220        Ok(Self {
221            auth_client,
222            kv_client,
223            watch_client,
224            cluster_client,
225            maintenance_client,
226            lease_client,
227        })
228    }
229
230    pub async fn connect(mut cfg: ClientConfig) -> Result<Self> {
231        let cli = Self::connect_with_token(&cfg, None).await?;
232
233        match cfg.auth.take() {
234            Some((name, password)) => {
235                let token = cli.authenticate((name, password)).await?.token;
236
237                Self::connect_with_token(&cfg, Some(token)).await
238            }
239            None => Ok(cli),
240        }
241    }
242}
243
244#[async_trait]
245impl AuthOp for Client {
246    async fn authenticate<R>(&self, req: R) -> Result<AuthenticateResponse>
247    where
248        R: Into<AuthenticateRequest> + Send,
249    {
250        let req = tonic::Request::new(req.into().into());
251        let resp = self.auth_client.clone().authenticate(req).await?;
252
253        Ok(resp.into_inner().into())
254    }
255}
256
257#[async_trait]
258impl KeyValueOp for Client {
259    async fn put<R>(&self, req: R) -> Result<PutResponse>
260    where
261        R: Into<PutRequest> + Send,
262    {
263        let req = tonic::Request::new(req.into().into());
264        let resp = self.kv_client.clone().put(req).await?;
265
266        Ok(resp.into_inner().into())
267    }
268
269    async fn get<R>(&self, req: R) -> Result<RangeResponse>
270    where
271        R: Into<RangeRequest> + Send,
272    {
273        let req = tonic::Request::new(req.into().into());
274        let resp = self.kv_client.clone().range(req).await?;
275
276        Ok(resp.into_inner().into())
277    }
278
279    async fn get_all(&self) -> Result<RangeResponse> {
280        self.get(KeyRange::all()).await
281    }
282
283    async fn get_by_prefix<K>(&self, p: K) -> Result<RangeResponse>
284    where
285        K: Into<Vec<u8>> + Send,
286    {
287        self.get(KeyRange::prefix(p)).await
288    }
289
290    async fn get_range<F, E>(&self, from: F, end: E) -> Result<RangeResponse>
291    where
292        F: Into<Vec<u8>> + Send,
293        E: Into<Vec<u8>> + Send,
294    {
295        self.get(KeyRange::range(from, end)).await
296    }
297
298    async fn delete<R>(&self, req: R) -> Result<DeleteResponse>
299    where
300        R: Into<DeleteRequest> + Send,
301    {
302        let req = tonic::Request::new(req.into().into());
303        let resp = self.kv_client.clone().delete_range(req).await?;
304
305        Ok(resp.into_inner().into())
306    }
307
308    async fn delete_all(&self) -> Result<DeleteResponse> {
309        self.delete(KeyRange::all()).await
310    }
311
312    async fn delete_by_prefix<K>(&self, p: K) -> Result<DeleteResponse>
313    where
314        K: Into<Vec<u8>> + Send,
315    {
316        self.delete(KeyRange::prefix(p)).await
317    }
318
319    async fn delete_range<F, E>(&self, from: F, end: E) -> Result<DeleteResponse>
320    where
321        F: Into<Vec<u8>> + Send,
322        E: Into<Vec<u8>> + Send,
323    {
324        self.delete(KeyRange::range(from, end)).await
325    }
326
327    async fn txn<R>(&self, req: R) -> Result<TxnResponse>
328    where
329        R: Into<TxnRequest> + Send,
330    {
331        let req = tonic::Request::new(req.into().into());
332        let resp = self.kv_client.clone().txn(req).await?;
333
334        Ok(resp.into_inner().into())
335    }
336
337    async fn compact<R>(&self, req: R) -> Result<CompactResponse>
338    where
339        R: Into<CompactRequest> + Send,
340    {
341        let req = tonic::Request::new(req.into().into());
342        let resp = self.kv_client.clone().compact(req).await?;
343
344        Ok(resp.into_inner().into())
345    }
346}
347
348#[async_trait]
349impl WatchOp for Client {
350    async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
351    where
352        R: Into<WatchCreateRequest> + Send,
353    {
354        let (tx, rx) = channel::<etcdserverpb::WatchRequest>(128);
355
356        tx.send(req.into().into()).await?;
357
358        let resp = self
359            .watch_client
360            .clone()
361            .watch(ReceiverStream::new(rx))
362            .await?;
363
364        let mut inbound = resp.into_inner();
365
366        let watch_id = match inbound.message().await? {
367            Some(resp) => {
368                if !resp.created {
369                    return Err(Error::WatchEvent(
370                        "should receive created event at first".to_owned(),
371                    ));
372                }
373                assert!(resp.events.is_empty(), "received created event {:?}", resp);
374                resp.watch_id
375            }
376
377            None => return Err(Error::CreateWatch),
378        };
379
380        Ok((WatchStream::new(inbound), WatchCanceler::new(watch_id, tx)))
381    }
382}
383
384#[async_trait]
385impl LeaseOp for Client {
386    async fn grant_lease<R>(&self, req: R) -> Result<LeaseGrantResponse>
387    where
388        R: Into<LeaseGrantRequest> + Send,
389    {
390        let req = tonic::Request::new(req.into().into());
391        let resp = self.lease_client.clone().lease_grant(req).await?;
392        Ok(resp.into_inner().into())
393    }
394
395    async fn revoke<R>(&self, req: R) -> Result<LeaseRevokeResponse>
396    where
397        R: Into<LeaseRevokeRequest> + Send,
398    {
399        let req = tonic::Request::new(req.into().into());
400        let resp = self.lease_client.clone().lease_revoke(req).await?;
401        Ok(resp.into_inner().into())
402    }
403
404    async fn keep_alive_for(&self, lease_id: LeaseId) -> Result<LeaseKeepAlive> {
405        let (req_tx, req_rx) = channel(1024);
406
407        let req_rx = ReceiverStream::new(req_rx);
408
409        let resp_rx = self
410            .lease_client
411            .clone()
412            .lease_keep_alive(req_rx)
413            .await?
414            .into_inner();
415
416        Ok(LeaseKeepAlive::new(lease_id, req_tx, resp_rx))
417    }
418
419    async fn time_to_live<R>(&self, req: R) -> Result<LeaseTimeToLiveResponse>
420    where
421        R: Into<LeaseTimeToLiveRequest> + Send,
422    {
423        let req = tonic::Request::new(req.into().into());
424        let resp = self.lease_client.clone().lease_time_to_live(req).await?;
425        Ok(resp.into_inner().into())
426    }
427}
428
429#[async_trait]
430impl ClusterOp for Client {
431    async fn member_add<R>(&self, req: R) -> Result<MemberAddResponse>
432    where
433        R: Into<MemberAddRequest> + Send,
434    {
435        let req = tonic::Request::new(req.into().into());
436        let resp = self.cluster_client.clone().member_add(req).await?;
437
438        Ok(resp.into_inner().into())
439    }
440
441    async fn member_remove<R>(&self, req: R) -> Result<MemberRemoveResponse>
442    where
443        R: Into<MemberRemoveRequest> + Send,
444    {
445        let req = tonic::Request::new(req.into().into());
446        let resp = self.cluster_client.clone().member_remove(req).await?;
447
448        Ok(resp.into_inner().into())
449    }
450
451    async fn member_update<R>(&self, req: R) -> Result<MemberUpdateResponse>
452    where
453        R: Into<MemberUpdateRequest> + Send,
454    {
455        let req = tonic::Request::new(req.into().into());
456        let resp = self.cluster_client.clone().member_update(req).await?;
457
458        Ok(resp.into_inner().into())
459    }
460
461    async fn member_list(&self) -> Result<MemberListResponse> {
462        let req = tonic::Request::new(MemberListRequest::new().into());
463        let resp = self.cluster_client.clone().member_list(req).await?;
464
465        Ok(resp.into_inner().into())
466    }
467}