etcd_client/
client.rs

1//! Asynchronous client & synchronous client.
2
3#[cfg(feature = "raw-channel")]
4use crate::channel::Channel;
5use crate::error::{Error, Result};
6use crate::intercept::{InterceptedChannel, Interceptor};
7use crate::lock::RwLockExt;
8#[cfg(feature = "tls-openssl")]
9use crate::openssl_tls::{OpenSslClientConfig, OpenSslConnector};
10use crate::rpc::auth::Permission;
11use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
12use crate::rpc::auth::{
13    RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
14    RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
15    UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
16    UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
17};
18use crate::rpc::cluster::{
19    ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
20    MemberRemoveResponse, MemberUpdateResponse,
21};
22use crate::rpc::election::{
23    CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
24    ProclaimResponse, ResignOptions, ResignResponse,
25};
26use crate::rpc::kv::{
27    CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
28    KvClient, PutOptions, PutResponse, Txn, TxnResponse,
29};
30use crate::rpc::lease::{
31    LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
32    LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
33};
34use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
35use crate::rpc::maintenance::{
36    AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
37    HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
38};
39use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
40#[cfg(feature = "tls-openssl")]
41use crate::OpenSslResult;
42#[cfg(feature = "tls")]
43use crate::TlsOptions;
44use http::uri::Uri;
45use http::HeaderValue;
46
47use std::str::FromStr;
48use std::sync::{Arc, RwLock};
49use std::time::Duration;
50use tokio::sync::mpsc::Sender;
51
52use tonic::transport::{channel::Change, Endpoint};
53
54const HTTP_PREFIX: &str = "http://";
55const HTTPS_PREFIX: &str = "https://";
56
57/// Asynchronous `etcd` client using v3 API.
58#[derive(Clone)]
59pub struct Client {
60    kv: KvClient,
61    watch: WatchClient,
62    lease: LeaseClient,
63    lock: LockClient,
64    auth: AuthClient,
65    maintenance: MaintenanceClient,
66    cluster: ClusterClient,
67    election: ElectionClient,
68    options: Option<ConnectOptions>,
69    tx: Option<Sender<Change<Uri, Endpoint>>>,
70}
71
72impl Client {
73    /// Connect to `etcd` servers from given `endpoints`.
74    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
75        endpoints: S,
76        options: Option<ConnectOptions>,
77    ) -> Result<Self> {
78        #[cfg(not(feature = "tls-openssl"))]
79        let make_balanced_channel = crate::channel::Tonic;
80        #[cfg(feature = "tls-openssl")]
81        let make_balanced_channel = crate::channel::Openssl {
82            conn: options
83                .clone()
84                .and_then(|o| o.otls)
85                .unwrap_or_else(OpenSslConnector::create_default)?,
86        };
87        Self::connect_with_balanced_channel(endpoints, options, make_balanced_channel).await
88    }
89
90    /// Connect to `etcd` servers from given `endpoints` and a balanced channel.
91    pub async fn connect_with_balanced_channel<E: AsRef<str>, S: AsRef<[E]>, MBC>(
92        endpoints: S,
93        options: Option<ConnectOptions>,
94        make_balanced_channel: MBC,
95    ) -> Result<Self>
96    where
97        MBC: crate::channel::BalancedChannelBuilder,
98        crate::error::Error: From<MBC::Error>,
99    {
100        let endpoints = {
101            let mut eps = Vec::new();
102            for e in endpoints.as_ref() {
103                let channel = Self::build_endpoint(e.as_ref(), &options)?;
104                eps.push(channel);
105            }
106            eps
107        };
108
109        if endpoints.is_empty() {
110            return Err(Error::InvalidArgs(String::from("empty endpoints")));
111        }
112
113        // Always use balance strategy even if there is only one endpoint.
114        let (channel, tx) = make_balanced_channel.balanced_channel(64)?;
115        let channel = InterceptedChannel::new(
116            channel,
117            Interceptor {
118                require_leader: options.as_ref().map(|o| o.require_leader).unwrap_or(false),
119            },
120        );
121        for endpoint in endpoints {
122            // The rx inside `channel` won't be closed or dropped here
123            tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
124                .await
125                .unwrap();
126        }
127
128        let mut options = options;
129
130        let auth_token = Arc::new(RwLock::new(None));
131        Self::auth(channel.clone(), &mut options, &auth_token).await?;
132
133        Ok(Self::build_client(channel, Some(tx), auth_token, options))
134    }
135
136    #[cfg(feature = "raw-channel")]
137    /// Connect to `etcd` servers represented by the given `channel`.
138    pub async fn from_channel(channel: Channel, options: Option<ConnectOptions>) -> Result<Self> {
139        let channel = InterceptedChannel::new(
140            channel,
141            Interceptor {
142                require_leader: options.as_ref().map(|o| o.require_leader).unwrap_or(false),
143            },
144        );
145        let mut options = options;
146
147        let auth_token = Arc::new(RwLock::new(None));
148        Self::auth(channel.clone(), &mut options, &auth_token).await?;
149
150        Ok(Self::build_client(channel, None, auth_token, options))
151    }
152
153    fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
154        use tonic::transport::Channel as TonicChannel;
155        let mut endpoint = if url.starts_with(HTTP_PREFIX) {
156            #[cfg(feature = "tls")]
157            if let Some(connect_options) = options {
158                if connect_options.tls.is_some() {
159                    return Err(Error::InvalidArgs(String::from(
160                        "TLS options are only supported with HTTPS URLs",
161                    )));
162                }
163            }
164
165            TonicChannel::builder(url.parse()?)
166        } else if url.starts_with(HTTPS_PREFIX) {
167            #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
168            return Err(Error::InvalidArgs(String::from(
169                "HTTPS URLs are only supported with the feature \"tls\"",
170            )));
171
172            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
173            {
174                TonicChannel::builder(url.parse()?)
175            }
176
177            #[cfg(feature = "tls")]
178            {
179                let tls = if let Some(connect_options) = options {
180                    connect_options.tls.clone()
181                } else {
182                    None
183                }
184                .unwrap_or_else(TlsOptions::new);
185
186                TonicChannel::builder(url.parse()?).tls_config(tls)?
187            }
188        } else {
189            #[cfg(feature = "tls")]
190            {
191                let tls = if let Some(connect_options) = options {
192                    connect_options.tls.clone()
193                } else {
194                    None
195                };
196
197                match tls {
198                    Some(tls) => {
199                        let e = HTTPS_PREFIX.to_owned() + url;
200                        TonicChannel::builder(e.parse()?).tls_config(tls)?
201                    }
202                    None => {
203                        let e = HTTP_PREFIX.to_owned() + url;
204                        TonicChannel::builder(e.parse()?)
205                    }
206                }
207            }
208
209            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
210            {
211                let pfx = if options.as_ref().and_then(|o| o.otls.as_ref()).is_some() {
212                    HTTPS_PREFIX
213                } else {
214                    HTTP_PREFIX
215                };
216                let e = pfx.to_owned() + url;
217                TonicChannel::builder(e.parse()?)
218            }
219
220            #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
221            {
222                let e = HTTP_PREFIX.to_owned() + url;
223                TonicChannel::builder(e.parse()?)
224            }
225        };
226
227        if let Some(opts) = options {
228            if let Some((interval, timeout)) = opts.keep_alive {
229                endpoint = endpoint
230                    .keep_alive_while_idle(opts.keep_alive_while_idle)
231                    .http2_keep_alive_interval(interval)
232                    .keep_alive_timeout(timeout);
233            }
234
235            if let Some(timeout) = opts.timeout {
236                endpoint = endpoint.timeout(timeout);
237            }
238
239            if let Some(timeout) = opts.connect_timeout {
240                endpoint = endpoint.connect_timeout(timeout);
241            }
242
243            if let Some(tcp_keepalive) = opts.tcp_keepalive {
244                endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
245            }
246        }
247
248        Ok(endpoint)
249    }
250
251    async fn auth(
252        channel: InterceptedChannel,
253        options: &mut Option<ConnectOptions>,
254        auth_token: &Arc<RwLock<Option<HeaderValue>>>,
255    ) -> Result<()> {
256        let user = match options {
257            None => return Ok(()),
258            Some(opt) => {
259                // Take away the user, the password should not be stored in client.
260                opt.user.take()
261            }
262        };
263
264        if let Some((name, password)) = user {
265            let mut tmp_auth = AuthClient::new(channel, auth_token.clone());
266            let resp = tmp_auth.authenticate(name, password).await?;
267            auth_token.write_unpoisoned().replace(resp.token().parse()?);
268        }
269
270        Ok(())
271    }
272
273    fn build_client(
274        channel: InterceptedChannel,
275        tx: Option<Sender<Change<Uri, Endpoint>>>,
276        auth_token: Arc<RwLock<Option<HeaderValue>>>,
277        options: Option<ConnectOptions>,
278    ) -> Self {
279        let kv = KvClient::new(channel.clone(), auth_token.clone());
280        let watch = WatchClient::new(channel.clone(), auth_token.clone());
281        let lease = LeaseClient::new(channel.clone(), auth_token.clone());
282        let lock = LockClient::new(channel.clone(), auth_token.clone());
283        let auth = AuthClient::new(channel.clone(), auth_token.clone());
284        let cluster = ClusterClient::new(channel.clone(), auth_token.clone());
285        let maintenance = MaintenanceClient::new(channel.clone(), auth_token.clone());
286        let election = ElectionClient::new(channel, auth_token);
287
288        Self {
289            kv,
290            watch,
291            lease,
292            lock,
293            auth,
294            maintenance,
295            cluster,
296            election,
297            options,
298            tx,
299        }
300    }
301
302    /// Dynamically add an endpoint to the client.
303    ///
304    /// Which can be used to add a new member to the underlying balance cache.
305    /// The typical scenario is that application can use a services discovery
306    /// to discover the member list changes and add/remove them to/from the client.
307    ///
308    /// Note that the [`Client`] doesn't check the authentication before added.
309    /// So the etcd member of the added endpoint REQUIRES to use the same auth
310    /// token as when create the client. Otherwise, the underlying balance
311    /// services will not be able to connect to the new endpoint.
312    #[inline]
313    pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
314        let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
315        let Some(tx) = &self.tx else {
316            return Err(Error::EndpointsNotManaged);
317        };
318        tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
319            .await
320            .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {e}")))
321    }
322
323    /// Dynamically remove an endpoint from the client.
324    ///
325    /// Note that the `endpoint` str should be the same as it was added.
326    /// And the underlying balance services cache used the hash from the Uri,
327    /// which was parsed from `endpoint` str, to do the equality comparisons.
328    #[inline]
329    pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
330        let uri = http::Uri::from_str(endpoint.as_ref())?;
331        let Some(tx) = &self.tx else {
332            return Err(Error::EndpointsNotManaged);
333        };
334        tx.send(Change::Remove(uri))
335            .await
336            .map_err(|e| Error::EndpointError(format!("failed to remove endpoint because of {e}")))
337    }
338
339    /// Gets a KV client.
340    #[inline]
341    pub fn kv_client(&self) -> KvClient {
342        self.kv.clone()
343    }
344
345    /// Gets a watch client.
346    #[inline]
347    pub fn watch_client(&self) -> WatchClient {
348        self.watch.clone()
349    }
350
351    /// Gets a lease client.
352    #[inline]
353    pub fn lease_client(&self) -> LeaseClient {
354        self.lease.clone()
355    }
356
357    /// Gets an auth client.
358    #[inline]
359    pub fn auth_client(&self) -> AuthClient {
360        self.auth.clone()
361    }
362
363    /// Gets a maintenance client.
364    #[inline]
365    pub fn maintenance_client(&self) -> MaintenanceClient {
366        self.maintenance.clone()
367    }
368
369    /// Gets a cluster client.
370    #[inline]
371    pub fn cluster_client(&self) -> ClusterClient {
372        self.cluster.clone()
373    }
374
375    /// Gets a lock client.
376    #[inline]
377    pub fn lock_client(&self) -> LockClient {
378        self.lock.clone()
379    }
380
381    /// Gets a election client.
382    #[inline]
383    pub fn election_client(&self) -> ElectionClient {
384        self.election.clone()
385    }
386
387    /// Put the given key into the key-value store.
388    /// A put request increments the revision of the key-value store
389    /// and generates one event in the event history.
390    #[inline]
391    pub async fn put(
392        &mut self,
393        key: impl Into<Vec<u8>>,
394        value: impl Into<Vec<u8>>,
395        options: Option<PutOptions>,
396    ) -> Result<PutResponse> {
397        self.kv.put(key, value, options).await
398    }
399
400    /// Gets the key from the key-value store.
401    #[inline]
402    pub async fn get(
403        &mut self,
404        key: impl Into<Vec<u8>>,
405        options: Option<GetOptions>,
406    ) -> Result<GetResponse> {
407        self.kv.get(key, options).await
408    }
409
410    /// Deletes the given key from the key-value store.
411    #[inline]
412    pub async fn delete(
413        &mut self,
414        key: impl Into<Vec<u8>>,
415        options: Option<DeleteOptions>,
416    ) -> Result<DeleteResponse> {
417        self.kv.delete(key, options).await
418    }
419
420    /// Compacts the event history in the etcd key-value store. The key-value
421    /// store should be periodically compacted or the event history will continue to grow
422    /// indefinitely.
423    #[inline]
424    pub async fn compact(
425        &mut self,
426        revision: i64,
427        options: Option<CompactionOptions>,
428    ) -> Result<CompactionResponse> {
429        self.kv.compact(revision, options).await
430    }
431
432    /// Processes multiple operations in a single transaction.
433    /// A txn request increments the revision of the key-value store
434    /// and generates events with the same revision for every completed operation.
435    /// It is not allowed to modify the same key several times within one txn.
436    #[inline]
437    pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
438        self.kv.txn(txn).await
439    }
440
441    /// Watches for events happening or that have happened. Both input and output
442    /// are streams; the input stream is for creating and canceling watcher and the output
443    /// stream sends events. The entire event history can be watched starting from the
444    /// last compaction revision.
445    #[inline]
446    pub async fn watch(
447        &mut self,
448        key: impl Into<Vec<u8>>,
449        options: Option<WatchOptions>,
450    ) -> Result<(Watcher, WatchStream)> {
451        self.watch.watch(key, options).await
452    }
453
454    /// Creates a lease which expires if the server does not receive a keepAlive
455    /// within a given time to live period. All keys attached to the lease will be expired and
456    /// deleted if the lease expires. Each expired key generates a delete event in the event history.
457    #[inline]
458    pub async fn lease_grant(
459        &mut self,
460        ttl: i64,
461        options: Option<LeaseGrantOptions>,
462    ) -> Result<LeaseGrantResponse> {
463        self.lease.grant(ttl, options).await
464    }
465
466    /// Revokes a lease. All keys attached to the lease will expire and be deleted.
467    #[inline]
468    pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
469        self.lease.revoke(id).await
470    }
471
472    /// Keeps the lease alive by streaming keep alive requests from the client
473    /// to the server and streaming keep alive responses from the server to the client.
474    #[inline]
475    pub async fn lease_keep_alive(
476        &mut self,
477        id: i64,
478    ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
479        self.lease.keep_alive(id).await
480    }
481
482    /// Retrieves lease information.
483    #[inline]
484    pub async fn lease_time_to_live(
485        &mut self,
486        id: i64,
487        options: Option<LeaseTimeToLiveOptions>,
488    ) -> Result<LeaseTimeToLiveResponse> {
489        self.lease.time_to_live(id, options).await
490    }
491
492    /// Lists all existing leases.
493    #[inline]
494    pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
495        self.lease.leases().await
496    }
497
498    /// Lock acquires a distributed shared lock on a given named lock.
499    /// On success, it will return a unique key that exists so long as the
500    /// lock is held by the caller. This key can be used in conjunction with
501    /// transactions to safely ensure updates to etcd only occur while holding
502    /// lock ownership. The lock is held until Unlock is called on the key or the
503    /// lease associate with the owner expires.
504    #[inline]
505    pub async fn lock(
506        &mut self,
507        name: impl Into<Vec<u8>>,
508        options: Option<LockOptions>,
509    ) -> Result<LockResponse> {
510        self.lock.lock(name, options).await
511    }
512
513    /// Unlock takes a key returned by Lock and releases the hold on lock. The
514    /// next Lock caller waiting for the lock will then be woken up and given
515    /// ownership of the lock.
516    #[inline]
517    pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
518        self.lock.unlock(key).await
519    }
520
521    /// Enables authentication.
522    #[inline]
523    pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
524        self.auth.auth_enable().await
525    }
526
527    /// Disables authentication.
528    #[inline]
529    pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
530        self.auth.auth_disable().await
531    }
532
533    /// Adds role.
534    #[inline]
535    pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
536        self.auth.role_add(name).await
537    }
538
539    /// Deletes role.
540    #[inline]
541    pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
542        self.auth.role_delete(name).await
543    }
544
545    /// Gets role.
546    #[inline]
547    pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
548        self.auth.role_get(name).await
549    }
550
551    /// Lists role.
552    #[inline]
553    pub async fn role_list(&mut self) -> Result<RoleListResponse> {
554        self.auth.role_list().await
555    }
556
557    /// Grants role permission.
558    #[inline]
559    pub async fn role_grant_permission(
560        &mut self,
561        name: impl Into<String>,
562        perm: Permission,
563    ) -> Result<RoleGrantPermissionResponse> {
564        self.auth.role_grant_permission(name, perm).await
565    }
566
567    /// Revokes role permission.
568    #[inline]
569    pub async fn role_revoke_permission(
570        &mut self,
571        name: impl Into<String>,
572        key: impl Into<Vec<u8>>,
573        options: Option<RoleRevokePermissionOptions>,
574    ) -> Result<RoleRevokePermissionResponse> {
575        self.auth.role_revoke_permission(name, key, options).await
576    }
577
578    /// Add an user.
579    #[inline]
580    pub async fn user_add(
581        &mut self,
582        name: impl Into<String>,
583        password: impl Into<String>,
584        options: Option<UserAddOptions>,
585    ) -> Result<UserAddResponse> {
586        self.auth.user_add(name, password, options).await
587    }
588
589    /// Gets the user info by the user name.
590    #[inline]
591    pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
592        self.auth.user_get(name).await
593    }
594
595    /// Lists all users.
596    #[inline]
597    pub async fn user_list(&mut self) -> Result<UserListResponse> {
598        self.auth.user_list().await
599    }
600
601    /// Deletes the given key from the key-value store.
602    #[inline]
603    pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
604        self.auth.user_delete(name).await
605    }
606
607    /// Change password for an user.
608    #[inline]
609    pub async fn user_change_password(
610        &mut self,
611        name: impl Into<String>,
612        password: impl Into<String>,
613    ) -> Result<UserChangePasswordResponse> {
614        self.auth.user_change_password(name, password).await
615    }
616
617    /// Grant role for an user.
618    #[inline]
619    pub async fn user_grant_role(
620        &mut self,
621        user: impl Into<String>,
622        role: impl Into<String>,
623    ) -> Result<UserGrantRoleResponse> {
624        self.auth.user_grant_role(user, role).await
625    }
626
627    /// Revoke role for an user.
628    #[inline]
629    pub async fn user_revoke_role(
630        &mut self,
631        user: impl Into<String>,
632        role: impl Into<String>,
633    ) -> Result<UserRevokeRoleResponse> {
634        self.auth.user_revoke_role(user, role).await
635    }
636
637    /// Maintain(get, active or inactive) alarms of members.
638    #[inline]
639    pub async fn alarm(
640        &mut self,
641        alarm_action: AlarmAction,
642        alarm_type: AlarmType,
643        options: Option<AlarmOptions>,
644    ) -> Result<AlarmResponse> {
645        self.maintenance
646            .alarm(alarm_action, alarm_type, options)
647            .await
648    }
649
650    /// Gets the status of a member.
651    #[inline]
652    pub async fn status(&mut self) -> Result<StatusResponse> {
653        self.maintenance.status().await
654    }
655
656    /// Defragments a member's backend database to recover storage space.
657    #[inline]
658    pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
659        self.maintenance.defragment().await
660    }
661
662    /// Computes the hash of whole backend keyspace.
663    /// including key, lease, and other buckets in storage.
664    /// This is designed for testing ONLY!
665    #[inline]
666    pub async fn hash(&mut self) -> Result<HashResponse> {
667        self.maintenance.hash().await
668    }
669
670    /// Computes the hash of all MVCC keys up to a given revision.
671    /// It only iterates \"key\" bucket in backend storage.
672    #[inline]
673    pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
674        self.maintenance.hash_kv(revision).await
675    }
676
677    /// Gets a snapshot of the entire backend from a member over a stream to a client.
678    #[inline]
679    pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
680        self.maintenance.snapshot().await
681    }
682
683    /// Adds current connected server as a member.
684    #[inline]
685    pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
686        &mut self,
687        urls: S,
688        options: Option<MemberAddOptions>,
689    ) -> Result<MemberAddResponse> {
690        let mut eps = Vec::new();
691        for e in urls.as_ref() {
692            let e = e.as_ref();
693            let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
694                e.to_string()
695            } else {
696                HTTP_PREFIX.to_owned() + e
697            };
698            eps.push(url);
699        }
700
701        self.cluster.member_add(eps, options).await
702    }
703
704    /// Remove a member.
705    #[inline]
706    pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
707        self.cluster.member_remove(id).await
708    }
709
710    /// Updates the member.
711    #[inline]
712    pub async fn member_update(
713        &mut self,
714        id: u64,
715        url: impl Into<Vec<String>>,
716    ) -> Result<MemberUpdateResponse> {
717        self.cluster.member_update(id, url).await
718    }
719
720    /// Promotes the member.
721    #[inline]
722    pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
723        self.cluster.member_promote(id).await
724    }
725
726    /// Lists members.
727    #[inline]
728    pub async fn member_list(&mut self) -> Result<MemberListResponse> {
729        self.cluster.member_list().await
730    }
731
732    /// Moves the current leader node to target node.
733    #[inline]
734    pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
735        self.maintenance.move_leader(target_id).await
736    }
737
738    /// Puts a value as eligible for the election on the prefix key.
739    /// Multiple sessions can participate in the election for the
740    /// same prefix, but only one can be the leader at a time.
741    #[inline]
742    pub async fn campaign(
743        &mut self,
744        name: impl Into<Vec<u8>>,
745        value: impl Into<Vec<u8>>,
746        lease: i64,
747    ) -> Result<CampaignResponse> {
748        self.election.campaign(name, value, lease).await
749    }
750
751    /// Lets the leader announce a new value without another election.
752    #[inline]
753    pub async fn proclaim(
754        &mut self,
755        value: impl Into<Vec<u8>>,
756        options: Option<ProclaimOptions>,
757    ) -> Result<ProclaimResponse> {
758        self.election.proclaim(value, options).await
759    }
760
761    /// Returns the leader value for the current election.
762    #[inline]
763    pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
764        self.election.leader(name).await
765    }
766
767    /// Returns a channel that reliably observes ordered leader proposals
768    /// as GetResponse values on every current elected leader key.
769    #[inline]
770    pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
771        self.election.observe(name).await
772    }
773
774    /// Releases election leadership and then start a new election
775    #[inline]
776    pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
777        self.election.resign(option).await
778    }
779
780    /// Sets client-side authentication.
781    pub async fn set_client_auth(&mut self, name: String, password: String) -> Result<()> {
782        self.auth.set_client_auth(name, password).await
783    }
784
785    /// Removes client-side authentication.
786    pub fn remove_client_auth(&mut self) {
787        self.auth.remove_client_auth();
788    }
789}
790
791/// Options for `Connect` operation.
792#[derive(Debug, Default, Clone)]
793pub struct ConnectOptions {
794    /// user is a pair values of name and password
795    user: Option<(String, String)>,
796    /// HTTP2 keep-alive: (keep_alive_interval, keep_alive_timeout)
797    keep_alive: Option<(Duration, Duration)>,
798    /// Whether send keep alive pings even there are no active streams.
799    keep_alive_while_idle: bool,
800    /// Apply a timeout to each gRPC request.
801    timeout: Option<Duration>,
802    /// Apply a timeout to connecting to the endpoint.
803    connect_timeout: Option<Duration>,
804    /// TCP keepalive.
805    tcp_keepalive: Option<Duration>,
806    #[cfg(feature = "tls")]
807    tls: Option<TlsOptions>,
808    #[cfg(feature = "tls-openssl")]
809    otls: Option<OpenSslResult<OpenSslConnector>>,
810    /// Require a leader to be present for the operation to complete.
811    require_leader: bool,
812}
813
814impl ConnectOptions {
815    /// name is the identifier for the distributed shared lock to be acquired.
816    #[inline]
817    pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
818        self.user = Some((name.into(), password.into()));
819        self
820    }
821
822    /// Sets TLS options.
823    ///
824    /// Notes that this function have to work with `HTTPS` URLs.
825    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
826    #[cfg(feature = "tls")]
827    #[inline]
828    pub fn with_tls(mut self, tls: TlsOptions) -> Self {
829        self.tls = Some(tls);
830        self
831    }
832
833    /// Sets TLS options, however using the OpenSSL implementation.
834    #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
835    #[cfg(feature = "tls-openssl")]
836    #[inline]
837    pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
838        // NOTE1: Perhaps we can unify the essential TLS config terms by something like `TlsBuilder`?
839        //
840        // NOTE2: we delay the checking at connection step to keep consistency with tonic, however would
841        // things be better if we validate the config at here?
842        self.otls = Some(otls.build());
843        self
844    }
845
846    /// Enable HTTP2 keep-alive with `interval` and `timeout`.
847    #[inline]
848    pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
849        self.keep_alive = Some((interval, timeout));
850        self
851    }
852
853    /// Apply a timeout to each request.
854    #[inline]
855    pub fn with_timeout(mut self, timeout: Duration) -> Self {
856        self.timeout = Some(timeout);
857        self
858    }
859
860    /// Apply a timeout to connecting to the endpoint.
861    #[inline]
862    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
863        self.connect_timeout = Some(timeout);
864        self
865    }
866
867    /// Enable TCP keepalive.
868    #[inline]
869    pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
870        self.tcp_keepalive = Some(tcp_keepalive);
871        self
872    }
873
874    /// Whether send keep alive pings even there are no active requests.
875    /// If disabled, keep-alive pings are only sent while there are opened request/response streams.
876    /// If enabled, pings are also sent when no streams are active.
877    /// NOTE: Some implementations of gRPC server may send GOAWAY if there are too many pings.
878    ///       This would be useful if you meet some error like `too many pings`.
879    #[inline]
880    pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
881        self.keep_alive_while_idle = enabled;
882        self
883    }
884
885    /// Whether to enforce that a leader be present in the etcd cluster.
886    #[inline]
887    pub fn with_require_leader(mut self, require_leader: bool) -> Self {
888        self.require_leader = require_leader;
889        self
890    }
891
892    /// Creates a `ConnectOptions`.
893    #[inline]
894    pub const fn new() -> Self {
895        ConnectOptions {
896            user: None,
897            keep_alive: None,
898            keep_alive_while_idle: true,
899            timeout: None,
900            connect_timeout: None,
901            tcp_keepalive: None,
902            #[cfg(feature = "tls")]
903            tls: None,
904            #[cfg(feature = "tls-openssl")]
905            otls: None,
906            require_leader: false,
907        }
908    }
909}