etcd_client/
client.rs

1//! Asynchronous client & synchronous client.
2
3#[cfg(not(feature = "tls-openssl"))]
4use crate::channel::Channel;
5use crate::error::{Error, Result};
6use crate::intercept::{InterceptedChannel, Interceptor};
7use crate::lock::RwLockExt;
8#[cfg(feature = "tls-openssl")]
9use crate::openssl_tls::{self, OpenSslClientConfig, OpenSslConnector};
10use crate::rpc::auth::Permission;
11use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
12use crate::rpc::auth::{
13    RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
14    RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
15    UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
16    UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
17};
18use crate::rpc::cluster::{
19    ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
20    MemberRemoveResponse, MemberUpdateResponse,
21};
22use crate::rpc::election::{
23    CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
24    ProclaimResponse, ResignOptions, ResignResponse,
25};
26use crate::rpc::kv::{
27    CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
28    KvClient, PutOptions, PutResponse, Txn, TxnResponse,
29};
30use crate::rpc::lease::{
31    LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
32    LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
33};
34use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
35use crate::rpc::maintenance::{
36    AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
37    HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
38};
39use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
40#[cfg(feature = "tls-openssl")]
41use crate::OpenSslResult;
42#[cfg(feature = "tls")]
43use crate::TlsOptions;
44use http::uri::Uri;
45use http::HeaderValue;
46
47use std::str::FromStr;
48use std::sync::{Arc, RwLock};
49use std::time::Duration;
50use tokio::sync::mpsc::Sender;
51
52use tonic::transport::Endpoint;
53
54use tower::discover::Change;
55
56const HTTP_PREFIX: &str = "http://";
57const HTTPS_PREFIX: &str = "https://";
58
59/// Asynchronous `etcd` client using v3 API.
60#[derive(Clone)]
61pub struct Client {
62    kv: KvClient,
63    watch: WatchClient,
64    lease: LeaseClient,
65    lock: LockClient,
66    auth: AuthClient,
67    maintenance: MaintenanceClient,
68    cluster: ClusterClient,
69    election: ElectionClient,
70    options: Option<ConnectOptions>,
71    tx: Sender<Change<Uri, Endpoint>>,
72}
73
74impl Client {
75    /// Connect to `etcd` servers from given `endpoints`.
76    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
77        endpoints: S,
78        options: Option<ConnectOptions>,
79    ) -> Result<Self> {
80        let endpoints = {
81            let mut eps = Vec::new();
82            for e in endpoints.as_ref() {
83                let channel = Self::build_endpoint(e.as_ref(), &options)?;
84                eps.push(channel);
85            }
86            eps
87        };
88
89        if endpoints.is_empty() {
90            return Err(Error::InvalidArgs(String::from("empty endpoints")));
91        }
92
93        // Always use balance strategy even if there is only one endpoint.
94        #[cfg(not(feature = "tls-openssl"))]
95        let (channel, tx) = Channel::balance_channel(64);
96        #[cfg(feature = "tls-openssl")]
97        let (channel, tx) = openssl_tls::balanced_channel(
98            options
99                .clone()
100                .and_then(|o| o.otls)
101                .unwrap_or_else(OpenSslConnector::create_default)?,
102        )?;
103        let channel = InterceptedChannel::new(
104            channel,
105            Interceptor {
106                require_leader: options.as_ref().map(|o| o.require_leader).unwrap_or(false),
107            },
108        );
109        for endpoint in endpoints {
110            // The rx inside `channel` won't be closed or dropped here
111            tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
112                .await
113                .unwrap();
114        }
115
116        let mut options = options;
117
118        let auth_token = Arc::new(RwLock::new(None));
119        Self::auth(channel.clone(), &mut options, &auth_token).await?;
120
121        Ok(Self::build_client(channel, tx, auth_token, options))
122    }
123
124    fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
125        #[cfg(feature = "tls-openssl")]
126        use tonic::transport::Channel;
127        let mut endpoint = if url.starts_with(HTTP_PREFIX) {
128            #[cfg(feature = "tls")]
129            if let Some(connect_options) = options {
130                if connect_options.tls.is_some() {
131                    return Err(Error::InvalidArgs(String::from(
132                        "TLS options are only supported with HTTPS URLs",
133                    )));
134                }
135            }
136
137            Channel::builder(url.parse()?)
138        } else if url.starts_with(HTTPS_PREFIX) {
139            #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
140            return Err(Error::InvalidArgs(String::from(
141                "HTTPS URLs are only supported with the feature \"tls\"",
142            )));
143
144            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
145            {
146                Channel::builder(url.parse()?)
147            }
148
149            #[cfg(feature = "tls")]
150            {
151                let tls = if let Some(connect_options) = options {
152                    connect_options.tls.clone()
153                } else {
154                    None
155                }
156                .unwrap_or_else(TlsOptions::new);
157
158                Channel::builder(url.parse()?).tls_config(tls)?
159            }
160        } else {
161            #[cfg(feature = "tls")]
162            {
163                let tls = if let Some(connect_options) = options {
164                    connect_options.tls.clone()
165                } else {
166                    None
167                };
168
169                match tls {
170                    Some(tls) => {
171                        let e = HTTPS_PREFIX.to_owned() + url;
172                        Channel::builder(e.parse()?).tls_config(tls)?
173                    }
174                    None => {
175                        let e = HTTP_PREFIX.to_owned() + url;
176                        Channel::builder(e.parse()?)
177                    }
178                }
179            }
180
181            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
182            {
183                let pfx = if options.as_ref().and_then(|o| o.otls.as_ref()).is_some() {
184                    HTTPS_PREFIX
185                } else {
186                    HTTP_PREFIX
187                };
188                let e = pfx.to_owned() + url;
189                Channel::builder(e.parse()?)
190            }
191
192            #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
193            {
194                let e = HTTP_PREFIX.to_owned() + url;
195                Channel::builder(e.parse()?)
196            }
197        };
198
199        if let Some(opts) = options {
200            if let Some((interval, timeout)) = opts.keep_alive {
201                endpoint = endpoint
202                    .keep_alive_while_idle(opts.keep_alive_while_idle)
203                    .http2_keep_alive_interval(interval)
204                    .keep_alive_timeout(timeout);
205            }
206
207            if let Some(timeout) = opts.timeout {
208                endpoint = endpoint.timeout(timeout);
209            }
210
211            if let Some(timeout) = opts.connect_timeout {
212                endpoint = endpoint.connect_timeout(timeout);
213            }
214
215            if let Some(tcp_keepalive) = opts.tcp_keepalive {
216                endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
217            }
218        }
219
220        Ok(endpoint)
221    }
222
223    async fn auth(
224        channel: InterceptedChannel,
225        options: &mut Option<ConnectOptions>,
226        auth_token: &Arc<RwLock<Option<HeaderValue>>>,
227    ) -> Result<()> {
228        let user = match options {
229            None => return Ok(()),
230            Some(opt) => {
231                // Take away the user, the password should not be stored in client.
232                opt.user.take()
233            }
234        };
235
236        if let Some((name, password)) = user {
237            let mut tmp_auth = AuthClient::new(channel, auth_token.clone());
238            let resp = tmp_auth.authenticate(name, password).await?;
239            auth_token.write_unpoisoned().replace(resp.token().parse()?);
240        }
241
242        Ok(())
243    }
244
245    fn build_client(
246        channel: InterceptedChannel,
247        tx: Sender<Change<Uri, Endpoint>>,
248        auth_token: Arc<RwLock<Option<HeaderValue>>>,
249        options: Option<ConnectOptions>,
250    ) -> Self {
251        let kv = KvClient::new(channel.clone(), auth_token.clone());
252        let watch = WatchClient::new(channel.clone(), auth_token.clone());
253        let lease = LeaseClient::new(channel.clone(), auth_token.clone());
254        let lock = LockClient::new(channel.clone(), auth_token.clone());
255        let auth = AuthClient::new(channel.clone(), auth_token.clone());
256        let cluster = ClusterClient::new(channel.clone(), auth_token.clone());
257        let maintenance = MaintenanceClient::new(channel.clone(), auth_token.clone());
258        let election = ElectionClient::new(channel, auth_token);
259
260        Self {
261            kv,
262            watch,
263            lease,
264            lock,
265            auth,
266            maintenance,
267            cluster,
268            election,
269            options,
270            tx,
271        }
272    }
273
274    /// Dynamically add an endpoint to the client.
275    ///
276    /// Which can be used to add a new member to the underlying balance cache.
277    /// The typical scenario is that application can use a services discovery
278    /// to discover the member list changes and add/remove them to/from the client.
279    ///
280    /// Note that the [`Client`] doesn't check the authentication before added.
281    /// So the etcd member of the added endpoint REQUIRES to use the same auth
282    /// token as when create the client. Otherwise, the underlying balance
283    /// services will not be able to connect to the new endpoint.
284    #[inline]
285    pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
286        let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
287        let tx = &self.tx;
288        tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
289            .await
290            .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {}", e)))
291    }
292
293    /// Dynamically remove an endpoint from the client.
294    ///
295    /// Note that the `endpoint` str should be the same as it was added.
296    /// And the underlying balance services cache used the hash from the Uri,
297    /// which was parsed from `endpoint` str, to do the equality comparisons.
298    #[inline]
299    pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
300        let uri = http::Uri::from_str(endpoint.as_ref())?;
301        let tx = &self.tx;
302        tx.send(Change::Remove(uri)).await.map_err(|e| {
303            Error::EndpointError(format!("failed to remove endpoint because of {}", e))
304        })
305    }
306
307    /// Gets a KV client.
308    #[inline]
309    pub fn kv_client(&self) -> KvClient {
310        self.kv.clone()
311    }
312
313    /// Gets a watch client.
314    #[inline]
315    pub fn watch_client(&self) -> WatchClient {
316        self.watch.clone()
317    }
318
319    /// Gets a lease client.
320    #[inline]
321    pub fn lease_client(&self) -> LeaseClient {
322        self.lease.clone()
323    }
324
325    /// Gets an auth client.
326    #[inline]
327    pub fn auth_client(&self) -> AuthClient {
328        self.auth.clone()
329    }
330
331    /// Gets a maintenance client.
332    #[inline]
333    pub fn maintenance_client(&self) -> MaintenanceClient {
334        self.maintenance.clone()
335    }
336
337    /// Gets a cluster client.
338    #[inline]
339    pub fn cluster_client(&self) -> ClusterClient {
340        self.cluster.clone()
341    }
342
343    /// Gets a lock client.
344    #[inline]
345    pub fn lock_client(&self) -> LockClient {
346        self.lock.clone()
347    }
348
349    /// Gets a election client.
350    #[inline]
351    pub fn election_client(&self) -> ElectionClient {
352        self.election.clone()
353    }
354
355    /// Put the given key into the key-value store.
356    /// A put request increments the revision of the key-value store
357    /// and generates one event in the event history.
358    #[inline]
359    pub async fn put(
360        &mut self,
361        key: impl Into<Vec<u8>>,
362        value: impl Into<Vec<u8>>,
363        options: Option<PutOptions>,
364    ) -> Result<PutResponse> {
365        self.kv.put(key, value, options).await
366    }
367
368    /// Gets the key from the key-value store.
369    #[inline]
370    pub async fn get(
371        &mut self,
372        key: impl Into<Vec<u8>>,
373        options: Option<GetOptions>,
374    ) -> Result<GetResponse> {
375        self.kv.get(key, options).await
376    }
377
378    /// Deletes the given key from the key-value store.
379    #[inline]
380    pub async fn delete(
381        &mut self,
382        key: impl Into<Vec<u8>>,
383        options: Option<DeleteOptions>,
384    ) -> Result<DeleteResponse> {
385        self.kv.delete(key, options).await
386    }
387
388    /// Compacts the event history in the etcd key-value store. The key-value
389    /// store should be periodically compacted or the event history will continue to grow
390    /// indefinitely.
391    #[inline]
392    pub async fn compact(
393        &mut self,
394        revision: i64,
395        options: Option<CompactionOptions>,
396    ) -> Result<CompactionResponse> {
397        self.kv.compact(revision, options).await
398    }
399
400    /// Processes multiple operations in a single transaction.
401    /// A txn request increments the revision of the key-value store
402    /// and generates events with the same revision for every completed operation.
403    /// It is not allowed to modify the same key several times within one txn.
404    #[inline]
405    pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
406        self.kv.txn(txn).await
407    }
408
409    /// Watches for events happening or that have happened. Both input and output
410    /// are streams; the input stream is for creating and canceling watcher and the output
411    /// stream sends events. The entire event history can be watched starting from the
412    /// last compaction revision.
413    #[inline]
414    pub async fn watch(
415        &mut self,
416        key: impl Into<Vec<u8>>,
417        options: Option<WatchOptions>,
418    ) -> Result<(Watcher, WatchStream)> {
419        self.watch.watch(key, options).await
420    }
421
422    /// Creates a lease which expires if the server does not receive a keepAlive
423    /// within a given time to live period. All keys attached to the lease will be expired and
424    /// deleted if the lease expires. Each expired key generates a delete event in the event history.
425    #[inline]
426    pub async fn lease_grant(
427        &mut self,
428        ttl: i64,
429        options: Option<LeaseGrantOptions>,
430    ) -> Result<LeaseGrantResponse> {
431        self.lease.grant(ttl, options).await
432    }
433
434    /// Revokes a lease. All keys attached to the lease will expire and be deleted.
435    #[inline]
436    pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
437        self.lease.revoke(id).await
438    }
439
440    /// Keeps the lease alive by streaming keep alive requests from the client
441    /// to the server and streaming keep alive responses from the server to the client.
442    #[inline]
443    pub async fn lease_keep_alive(
444        &mut self,
445        id: i64,
446    ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
447        self.lease.keep_alive(id).await
448    }
449
450    /// Retrieves lease information.
451    #[inline]
452    pub async fn lease_time_to_live(
453        &mut self,
454        id: i64,
455        options: Option<LeaseTimeToLiveOptions>,
456    ) -> Result<LeaseTimeToLiveResponse> {
457        self.lease.time_to_live(id, options).await
458    }
459
460    /// Lists all existing leases.
461    #[inline]
462    pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
463        self.lease.leases().await
464    }
465
466    /// Lock acquires a distributed shared lock on a given named lock.
467    /// On success, it will return a unique key that exists so long as the
468    /// lock is held by the caller. This key can be used in conjunction with
469    /// transactions to safely ensure updates to etcd only occur while holding
470    /// lock ownership. The lock is held until Unlock is called on the key or the
471    /// lease associate with the owner expires.
472    #[inline]
473    pub async fn lock(
474        &mut self,
475        name: impl Into<Vec<u8>>,
476        options: Option<LockOptions>,
477    ) -> Result<LockResponse> {
478        self.lock.lock(name, options).await
479    }
480
481    /// Unlock takes a key returned by Lock and releases the hold on lock. The
482    /// next Lock caller waiting for the lock will then be woken up and given
483    /// ownership of the lock.
484    #[inline]
485    pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
486        self.lock.unlock(key).await
487    }
488
489    /// Enables authentication.
490    #[inline]
491    pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
492        self.auth.auth_enable().await
493    }
494
495    /// Disables authentication.
496    #[inline]
497    pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
498        self.auth.auth_disable().await
499    }
500
501    /// Adds role.
502    #[inline]
503    pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
504        self.auth.role_add(name).await
505    }
506
507    /// Deletes role.
508    #[inline]
509    pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
510        self.auth.role_delete(name).await
511    }
512
513    /// Gets role.
514    #[inline]
515    pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
516        self.auth.role_get(name).await
517    }
518
519    /// Lists role.
520    #[inline]
521    pub async fn role_list(&mut self) -> Result<RoleListResponse> {
522        self.auth.role_list().await
523    }
524
525    /// Grants role permission.
526    #[inline]
527    pub async fn role_grant_permission(
528        &mut self,
529        name: impl Into<String>,
530        perm: Permission,
531    ) -> Result<RoleGrantPermissionResponse> {
532        self.auth.role_grant_permission(name, perm).await
533    }
534
535    /// Revokes role permission.
536    #[inline]
537    pub async fn role_revoke_permission(
538        &mut self,
539        name: impl Into<String>,
540        key: impl Into<Vec<u8>>,
541        options: Option<RoleRevokePermissionOptions>,
542    ) -> Result<RoleRevokePermissionResponse> {
543        self.auth.role_revoke_permission(name, key, options).await
544    }
545
546    /// Add an user.
547    #[inline]
548    pub async fn user_add(
549        &mut self,
550        name: impl Into<String>,
551        password: impl Into<String>,
552        options: Option<UserAddOptions>,
553    ) -> Result<UserAddResponse> {
554        self.auth.user_add(name, password, options).await
555    }
556
557    /// Gets the user info by the user name.
558    #[inline]
559    pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
560        self.auth.user_get(name).await
561    }
562
563    /// Lists all users.
564    #[inline]
565    pub async fn user_list(&mut self) -> Result<UserListResponse> {
566        self.auth.user_list().await
567    }
568
569    /// Deletes the given key from the key-value store.
570    #[inline]
571    pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
572        self.auth.user_delete(name).await
573    }
574
575    /// Change password for an user.
576    #[inline]
577    pub async fn user_change_password(
578        &mut self,
579        name: impl Into<String>,
580        password: impl Into<String>,
581    ) -> Result<UserChangePasswordResponse> {
582        self.auth.user_change_password(name, password).await
583    }
584
585    /// Grant role for an user.
586    #[inline]
587    pub async fn user_grant_role(
588        &mut self,
589        user: impl Into<String>,
590        role: impl Into<String>,
591    ) -> Result<UserGrantRoleResponse> {
592        self.auth.user_grant_role(user, role).await
593    }
594
595    /// Revoke role for an user.
596    #[inline]
597    pub async fn user_revoke_role(
598        &mut self,
599        user: impl Into<String>,
600        role: impl Into<String>,
601    ) -> Result<UserRevokeRoleResponse> {
602        self.auth.user_revoke_role(user, role).await
603    }
604
605    /// Maintain(get, active or inactive) alarms of members.
606    #[inline]
607    pub async fn alarm(
608        &mut self,
609        alarm_action: AlarmAction,
610        alarm_type: AlarmType,
611        options: Option<AlarmOptions>,
612    ) -> Result<AlarmResponse> {
613        self.maintenance
614            .alarm(alarm_action, alarm_type, options)
615            .await
616    }
617
618    /// Gets the status of a member.
619    #[inline]
620    pub async fn status(&mut self) -> Result<StatusResponse> {
621        self.maintenance.status().await
622    }
623
624    /// Defragments a member's backend database to recover storage space.
625    #[inline]
626    pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
627        self.maintenance.defragment().await
628    }
629
630    /// Computes the hash of whole backend keyspace.
631    /// including key, lease, and other buckets in storage.
632    /// This is designed for testing ONLY!
633    #[inline]
634    pub async fn hash(&mut self) -> Result<HashResponse> {
635        self.maintenance.hash().await
636    }
637
638    /// Computes the hash of all MVCC keys up to a given revision.
639    /// It only iterates \"key\" bucket in backend storage.
640    #[inline]
641    pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
642        self.maintenance.hash_kv(revision).await
643    }
644
645    /// Gets a snapshot of the entire backend from a member over a stream to a client.
646    #[inline]
647    pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
648        self.maintenance.snapshot().await
649    }
650
651    /// Adds current connected server as a member.
652    #[inline]
653    pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
654        &mut self,
655        urls: S,
656        options: Option<MemberAddOptions>,
657    ) -> Result<MemberAddResponse> {
658        let mut eps = Vec::new();
659        for e in urls.as_ref() {
660            let e = e.as_ref();
661            let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
662                e.to_string()
663            } else {
664                HTTP_PREFIX.to_owned() + e
665            };
666            eps.push(url);
667        }
668
669        self.cluster.member_add(eps, options).await
670    }
671
672    /// Remove a member.
673    #[inline]
674    pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
675        self.cluster.member_remove(id).await
676    }
677
678    /// Updates the member.
679    #[inline]
680    pub async fn member_update(
681        &mut self,
682        id: u64,
683        url: impl Into<Vec<String>>,
684    ) -> Result<MemberUpdateResponse> {
685        self.cluster.member_update(id, url).await
686    }
687
688    /// Promotes the member.
689    #[inline]
690    pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
691        self.cluster.member_promote(id).await
692    }
693
694    /// Lists members.
695    #[inline]
696    pub async fn member_list(&mut self) -> Result<MemberListResponse> {
697        self.cluster.member_list().await
698    }
699
700    /// Moves the current leader node to target node.
701    #[inline]
702    pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
703        self.maintenance.move_leader(target_id).await
704    }
705
706    /// Puts a value as eligible for the election on the prefix key.
707    /// Multiple sessions can participate in the election for the
708    /// same prefix, but only one can be the leader at a time.
709    #[inline]
710    pub async fn campaign(
711        &mut self,
712        name: impl Into<Vec<u8>>,
713        value: impl Into<Vec<u8>>,
714        lease: i64,
715    ) -> Result<CampaignResponse> {
716        self.election.campaign(name, value, lease).await
717    }
718
719    /// Lets the leader announce a new value without another election.
720    #[inline]
721    pub async fn proclaim(
722        &mut self,
723        value: impl Into<Vec<u8>>,
724        options: Option<ProclaimOptions>,
725    ) -> Result<ProclaimResponse> {
726        self.election.proclaim(value, options).await
727    }
728
729    /// Returns the leader value for the current election.
730    #[inline]
731    pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
732        self.election.leader(name).await
733    }
734
735    /// Returns a channel that reliably observes ordered leader proposals
736    /// as GetResponse values on every current elected leader key.
737    #[inline]
738    pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
739        self.election.observe(name).await
740    }
741
742    /// Releases election leadership and then start a new election
743    #[inline]
744    pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
745        self.election.resign(option).await
746    }
747
748    /// Sets client-side authentication.
749    pub async fn set_client_auth(&mut self, name: String, password: String) -> Result<()> {
750        self.auth.set_client_auth(name, password).await
751    }
752
753    /// Removes client-side authentication.
754    pub fn remove_client_auth(&mut self) {
755        self.auth.remove_client_auth();
756    }
757}
758
759/// Options for `Connect` operation.
760#[derive(Debug, Default, Clone)]
761pub struct ConnectOptions {
762    /// user is a pair values of name and password
763    user: Option<(String, String)>,
764    /// HTTP2 keep-alive: (keep_alive_interval, keep_alive_timeout)
765    keep_alive: Option<(Duration, Duration)>,
766    /// Whether send keep alive pings even there are no active streams.
767    keep_alive_while_idle: bool,
768    /// Apply a timeout to each gRPC request.
769    timeout: Option<Duration>,
770    /// Apply a timeout to connecting to the endpoint.
771    connect_timeout: Option<Duration>,
772    /// TCP keepalive.
773    tcp_keepalive: Option<Duration>,
774    #[cfg(feature = "tls")]
775    tls: Option<TlsOptions>,
776    #[cfg(feature = "tls-openssl")]
777    otls: Option<OpenSslResult<OpenSslConnector>>,
778    /// Require a leader to be present for the operation to complete.
779    require_leader: bool,
780}
781
782impl ConnectOptions {
783    /// name is the identifier for the distributed shared lock to be acquired.
784    #[inline]
785    pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
786        self.user = Some((name.into(), password.into()));
787        self
788    }
789
790    /// Sets TLS options.
791    ///
792    /// Notes that this function have to work with `HTTPS` URLs.
793    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
794    #[cfg(feature = "tls")]
795    #[inline]
796    pub fn with_tls(mut self, tls: TlsOptions) -> Self {
797        self.tls = Some(tls);
798        self
799    }
800
801    /// Sets TLS options, however using the OpenSSL implementation.
802    #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
803    #[cfg(feature = "tls-openssl")]
804    #[inline]
805    pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
806        // NOTE1: Perhaps we can unify the essential TLS config terms by something like `TlsBuilder`?
807        //
808        // NOTE2: we delay the checking at connection step to keep consistency with tonic, however would
809        // things be better if we validate the config at here?
810        self.otls = Some(otls.build());
811        self
812    }
813
814    /// Enable HTTP2 keep-alive with `interval` and `timeout`.
815    #[inline]
816    pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
817        self.keep_alive = Some((interval, timeout));
818        self
819    }
820
821    /// Apply a timeout to each request.
822    #[inline]
823    pub fn with_timeout(mut self, timeout: Duration) -> Self {
824        self.timeout = Some(timeout);
825        self
826    }
827
828    /// Apply a timeout to connecting to the endpoint.
829    #[inline]
830    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
831        self.connect_timeout = Some(timeout);
832        self
833    }
834
835    /// Enable TCP keepalive.
836    #[inline]
837    pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
838        self.tcp_keepalive = Some(tcp_keepalive);
839        self
840    }
841
842    /// Whether send keep alive pings even there are no active requests.
843    /// If disabled, keep-alive pings are only sent while there are opened request/response streams.
844    /// If enabled, pings are also sent when no streams are active.
845    /// NOTE: Some implementations of gRPC server may send GOAWAY if there are too many pings.
846    ///       This would be useful if you meet some error like `too many pings`.
847    #[inline]
848    pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
849        self.keep_alive_while_idle = enabled;
850        self
851    }
852
853    /// Whether to enforce that a leader be present in the etcd cluster.
854    #[inline]
855    pub fn with_require_leader(mut self, require_leader: bool) -> Self {
856        self.require_leader = require_leader;
857        self
858    }
859
860    /// Creates a `ConnectOptions`.
861    #[inline]
862    pub const fn new() -> Self {
863        ConnectOptions {
864            user: None,
865            keep_alive: None,
866            keep_alive_while_idle: true,
867            timeout: None,
868            connect_timeout: None,
869            tcp_keepalive: None,
870            #[cfg(feature = "tls")]
871            tls: None,
872            #[cfg(feature = "tls-openssl")]
873            otls: None,
874            require_leader: false,
875        }
876    }
877}