etcd_client/
client.rs

1//! Asynchronous client & synchronous client.
2
3use crate::channel::Channel;
4use crate::error::{Error, Result};
5#[cfg(feature = "tls-openssl")]
6use crate::openssl_tls::{self, OpenSslClientConfig, OpenSslConnector};
7use crate::rpc::auth::Permission;
8use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
9use crate::rpc::auth::{
10    RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
11    RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
12    UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
13    UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
14};
15use crate::rpc::cluster::{
16    ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
17    MemberRemoveResponse, MemberUpdateResponse,
18};
19use crate::rpc::election::{
20    CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
21    ProclaimResponse, ResignOptions, ResignResponse,
22};
23use crate::rpc::kv::{
24    CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
25    KvClient, PutOptions, PutResponse, Txn, TxnResponse,
26};
27use crate::rpc::lease::{
28    LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
29    LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
30};
31use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
32use crate::rpc::maintenance::{
33    AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
34    HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
35};
36use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
37#[cfg(feature = "tls-openssl")]
38use crate::OpenSslResult;
39#[cfg(feature = "tls")]
40use crate::TlsOptions;
41use http::uri::Uri;
42use http::HeaderValue;
43
44use std::str::FromStr;
45use std::sync::{Arc, RwLock};
46use std::time::Duration;
47use tokio::sync::mpsc::Sender;
48
49use tonic::transport::Endpoint;
50
51use tower::discover::Change;
52
53const HTTP_PREFIX: &str = "http://";
54const HTTPS_PREFIX: &str = "https://";
55
56/// Asynchronous `etcd` client using v3 API.
57#[derive(Clone)]
58pub struct Client {
59    kv: KvClient,
60    watch: WatchClient,
61    lease: LeaseClient,
62    lock: LockClient,
63    auth: AuthClient,
64    maintenance: MaintenanceClient,
65    cluster: ClusterClient,
66    election: ElectionClient,
67    options: Option<ConnectOptions>,
68    tx: Sender<Change<Uri, Endpoint>>,
69}
70
71impl Client {
72    /// Connect to `etcd` servers from given `endpoints`.
73    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
74        endpoints: S,
75        options: Option<ConnectOptions>,
76    ) -> Result<Self> {
77        let endpoints = {
78            let mut eps = Vec::new();
79            for e in endpoints.as_ref() {
80                let channel = Self::build_endpoint(e.as_ref(), &options)?;
81                eps.push(channel);
82            }
83            eps
84        };
85
86        if endpoints.is_empty() {
87            return Err(Error::InvalidArgs(String::from("empty endpoints")));
88        }
89
90        // Always use balance strategy even if there is only one endpoint.
91        #[cfg(not(feature = "tls-openssl"))]
92        let (channel, tx) = Channel::balance_channel(64);
93        #[cfg(feature = "tls-openssl")]
94        let (channel, tx) = openssl_tls::balanced_channel(
95            options
96                .clone()
97                .and_then(|o| o.otls)
98                .unwrap_or_else(OpenSslConnector::create_default)?,
99        )?;
100        for endpoint in endpoints {
101            // The rx inside `channel` won't be closed or dropped here
102            tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
103                .await
104                .unwrap();
105        }
106
107        let mut options = options;
108
109        let auth_token = Arc::new(RwLock::new(None));
110        Self::auth(channel.clone(), &mut options, &auth_token).await?;
111
112        Ok(Self::build_client(channel, tx, auth_token, options))
113    }
114
115    fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
116        #[cfg(feature = "tls-openssl")]
117        use tonic::transport::Channel;
118        let mut endpoint = if url.starts_with(HTTP_PREFIX) {
119            #[cfg(feature = "tls")]
120            if let Some(connect_options) = options {
121                if connect_options.tls.is_some() {
122                    return Err(Error::InvalidArgs(String::from(
123                        "TLS options are only supported with HTTPS URLs",
124                    )));
125                }
126            }
127
128            Channel::builder(url.parse()?)
129        } else if url.starts_with(HTTPS_PREFIX) {
130            #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
131            return Err(Error::InvalidArgs(String::from(
132                "HTTPS URLs are only supported with the feature \"tls\"",
133            )));
134
135            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
136            {
137                Channel::builder(url.parse()?)
138            }
139
140            #[cfg(feature = "tls")]
141            {
142                let tls = if let Some(connect_options) = options {
143                    connect_options.tls.clone()
144                } else {
145                    None
146                }
147                .unwrap_or_else(TlsOptions::new);
148
149                Channel::builder(url.parse()?).tls_config(tls)?
150            }
151        } else {
152            #[cfg(feature = "tls")]
153            {
154                let tls = if let Some(connect_options) = options {
155                    connect_options.tls.clone()
156                } else {
157                    None
158                };
159
160                match tls {
161                    Some(tls) => {
162                        let e = HTTPS_PREFIX.to_owned() + url;
163                        Channel::builder(e.parse()?).tls_config(tls)?
164                    }
165                    None => {
166                        let e = HTTP_PREFIX.to_owned() + url;
167                        Channel::builder(e.parse()?)
168                    }
169                }
170            }
171
172            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
173            {
174                let pfx = if options.as_ref().and_then(|o| o.otls.as_ref()).is_some() {
175                    HTTPS_PREFIX
176                } else {
177                    HTTP_PREFIX
178                };
179                let e = pfx.to_owned() + url;
180                Channel::builder(e.parse()?)
181            }
182
183            #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
184            {
185                let e = HTTP_PREFIX.to_owned() + url;
186                Channel::builder(e.parse()?)
187            }
188        };
189
190        if let Some(opts) = options {
191            if let Some((interval, timeout)) = opts.keep_alive {
192                endpoint = endpoint
193                    .keep_alive_while_idle(opts.keep_alive_while_idle)
194                    .http2_keep_alive_interval(interval)
195                    .keep_alive_timeout(timeout);
196            }
197
198            if let Some(timeout) = opts.timeout {
199                endpoint = endpoint.timeout(timeout);
200            }
201
202            if let Some(timeout) = opts.connect_timeout {
203                endpoint = endpoint.connect_timeout(timeout);
204            }
205
206            if let Some(tcp_keepalive) = opts.tcp_keepalive {
207                endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
208            }
209        }
210
211        Ok(endpoint)
212    }
213
214    async fn auth(
215        channel: Channel,
216        options: &mut Option<ConnectOptions>,
217        auth_token: &Arc<RwLock<Option<HeaderValue>>>,
218    ) -> Result<()> {
219        let user = match options {
220            None => return Ok(()),
221            Some(opt) => {
222                // Take away the user, the password should not be stored in client.
223                opt.user.take()
224            }
225        };
226
227        if let Some((name, password)) = user {
228            let mut tmp_auth = AuthClient::new(channel, auth_token.clone());
229            let resp = tmp_auth.authenticate(name, password).await?;
230            auth_token.write().unwrap().replace(resp.token().parse()?);
231        }
232
233        Ok(())
234    }
235
236    fn build_client(
237        channel: Channel,
238        tx: Sender<Change<Uri, Endpoint>>,
239        auth_token: Arc<RwLock<Option<HeaderValue>>>,
240        options: Option<ConnectOptions>,
241    ) -> Self {
242        let kv = KvClient::new(channel.clone(), auth_token.clone());
243        let watch = WatchClient::new(channel.clone(), auth_token.clone());
244        let lease = LeaseClient::new(channel.clone(), auth_token.clone());
245        let lock = LockClient::new(channel.clone(), auth_token.clone());
246        let auth = AuthClient::new(channel.clone(), auth_token.clone());
247        let cluster = ClusterClient::new(channel.clone(), auth_token.clone());
248        let maintenance = MaintenanceClient::new(channel.clone(), auth_token.clone());
249        let election = ElectionClient::new(channel, auth_token);
250
251        Self {
252            kv,
253            watch,
254            lease,
255            lock,
256            auth,
257            maintenance,
258            cluster,
259            election,
260            options,
261            tx,
262        }
263    }
264
265    /// Dynamically add an endpoint to the client.
266    ///
267    /// Which can be used to add a new member to the underlying balance cache.
268    /// The typical scenario is that application can use a services discovery
269    /// to discover the member list changes and add/remove them to/from the client.
270    ///
271    /// Note that the [`Client`] doesn't check the authentication before added.
272    /// So the etcd member of the added endpoint REQUIRES to use the same auth
273    /// token as when create the client. Otherwise, the underlying balance
274    /// services will not be able to connect to the new endpoint.
275    #[inline]
276    pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
277        let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
278        let tx = &self.tx;
279        tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
280            .await
281            .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {}", e)))
282    }
283
284    /// Dynamically remove an endpoint from the client.
285    ///
286    /// Note that the `endpoint` str should be the same as it was added.
287    /// And the underlying balance services cache used the hash from the Uri,
288    /// which was parsed from `endpoint` str, to do the equality comparisons.
289    #[inline]
290    pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
291        let uri = http::Uri::from_str(endpoint.as_ref())?;
292        let tx = &self.tx;
293        tx.send(Change::Remove(uri)).await.map_err(|e| {
294            Error::EndpointError(format!("failed to remove endpoint because of {}", e))
295        })
296    }
297
298    /// Gets a KV client.
299    #[inline]
300    pub fn kv_client(&self) -> KvClient {
301        self.kv.clone()
302    }
303
304    /// Gets a watch client.
305    #[inline]
306    pub fn watch_client(&self) -> WatchClient {
307        self.watch.clone()
308    }
309
310    /// Gets a lease client.
311    #[inline]
312    pub fn lease_client(&self) -> LeaseClient {
313        self.lease.clone()
314    }
315
316    /// Gets an auth client.
317    #[inline]
318    pub fn auth_client(&self) -> AuthClient {
319        self.auth.clone()
320    }
321
322    /// Gets a maintenance client.
323    #[inline]
324    pub fn maintenance_client(&self) -> MaintenanceClient {
325        self.maintenance.clone()
326    }
327
328    /// Gets a cluster client.
329    #[inline]
330    pub fn cluster_client(&self) -> ClusterClient {
331        self.cluster.clone()
332    }
333
334    /// Gets a lock client.
335    #[inline]
336    pub fn lock_client(&self) -> LockClient {
337        self.lock.clone()
338    }
339
340    /// Gets a election client.
341    #[inline]
342    pub fn election_client(&self) -> ElectionClient {
343        self.election.clone()
344    }
345
346    /// Put the given key into the key-value store.
347    /// A put request increments the revision of the key-value store
348    /// and generates one event in the event history.
349    #[inline]
350    pub async fn put(
351        &mut self,
352        key: impl Into<Vec<u8>>,
353        value: impl Into<Vec<u8>>,
354        options: Option<PutOptions>,
355    ) -> Result<PutResponse> {
356        self.kv.put(key, value, options).await
357    }
358
359    /// Gets the key from the key-value store.
360    #[inline]
361    pub async fn get(
362        &mut self,
363        key: impl Into<Vec<u8>>,
364        options: Option<GetOptions>,
365    ) -> Result<GetResponse> {
366        self.kv.get(key, options).await
367    }
368
369    /// Deletes the given key from the key-value store.
370    #[inline]
371    pub async fn delete(
372        &mut self,
373        key: impl Into<Vec<u8>>,
374        options: Option<DeleteOptions>,
375    ) -> Result<DeleteResponse> {
376        self.kv.delete(key, options).await
377    }
378
379    /// Compacts the event history in the etcd key-value store. The key-value
380    /// store should be periodically compacted or the event history will continue to grow
381    /// indefinitely.
382    #[inline]
383    pub async fn compact(
384        &mut self,
385        revision: i64,
386        options: Option<CompactionOptions>,
387    ) -> Result<CompactionResponse> {
388        self.kv.compact(revision, options).await
389    }
390
391    /// Processes multiple operations in a single transaction.
392    /// A txn request increments the revision of the key-value store
393    /// and generates events with the same revision for every completed operation.
394    /// It is not allowed to modify the same key several times within one txn.
395    #[inline]
396    pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
397        self.kv.txn(txn).await
398    }
399
400    /// Watches for events happening or that have happened. Both input and output
401    /// are streams; the input stream is for creating and canceling watcher and the output
402    /// stream sends events. The entire event history can be watched starting from the
403    /// last compaction revision.
404    #[inline]
405    pub async fn watch(
406        &mut self,
407        key: impl Into<Vec<u8>>,
408        options: Option<WatchOptions>,
409    ) -> Result<(Watcher, WatchStream)> {
410        self.watch.watch(key, options).await
411    }
412
413    /// Creates a lease which expires if the server does not receive a keepAlive
414    /// within a given time to live period. All keys attached to the lease will be expired and
415    /// deleted if the lease expires. Each expired key generates a delete event in the event history.
416    #[inline]
417    pub async fn lease_grant(
418        &mut self,
419        ttl: i64,
420        options: Option<LeaseGrantOptions>,
421    ) -> Result<LeaseGrantResponse> {
422        self.lease.grant(ttl, options).await
423    }
424
425    /// Revokes a lease. All keys attached to the lease will expire and be deleted.
426    #[inline]
427    pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
428        self.lease.revoke(id).await
429    }
430
431    /// Keeps the lease alive by streaming keep alive requests from the client
432    /// to the server and streaming keep alive responses from the server to the client.
433    #[inline]
434    pub async fn lease_keep_alive(
435        &mut self,
436        id: i64,
437    ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
438        self.lease.keep_alive(id).await
439    }
440
441    /// Retrieves lease information.
442    #[inline]
443    pub async fn lease_time_to_live(
444        &mut self,
445        id: i64,
446        options: Option<LeaseTimeToLiveOptions>,
447    ) -> Result<LeaseTimeToLiveResponse> {
448        self.lease.time_to_live(id, options).await
449    }
450
451    /// Lists all existing leases.
452    #[inline]
453    pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
454        self.lease.leases().await
455    }
456
457    /// Lock acquires a distributed shared lock on a given named lock.
458    /// On success, it will return a unique key that exists so long as the
459    /// lock is held by the caller. This key can be used in conjunction with
460    /// transactions to safely ensure updates to etcd only occur while holding
461    /// lock ownership. The lock is held until Unlock is called on the key or the
462    /// lease associate with the owner expires.
463    #[inline]
464    pub async fn lock(
465        &mut self,
466        name: impl Into<Vec<u8>>,
467        options: Option<LockOptions>,
468    ) -> Result<LockResponse> {
469        self.lock.lock(name, options).await
470    }
471
472    /// Unlock takes a key returned by Lock and releases the hold on lock. The
473    /// next Lock caller waiting for the lock will then be woken up and given
474    /// ownership of the lock.
475    #[inline]
476    pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
477        self.lock.unlock(key).await
478    }
479
480    /// Enables authentication.
481    #[inline]
482    pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
483        self.auth.auth_enable().await
484    }
485
486    /// Disables authentication.
487    #[inline]
488    pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
489        self.auth.auth_disable().await
490    }
491
492    /// Adds role.
493    #[inline]
494    pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
495        self.auth.role_add(name).await
496    }
497
498    /// Deletes role.
499    #[inline]
500    pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
501        self.auth.role_delete(name).await
502    }
503
504    /// Gets role.
505    #[inline]
506    pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
507        self.auth.role_get(name).await
508    }
509
510    /// Lists role.
511    #[inline]
512    pub async fn role_list(&mut self) -> Result<RoleListResponse> {
513        self.auth.role_list().await
514    }
515
516    /// Grants role permission.
517    #[inline]
518    pub async fn role_grant_permission(
519        &mut self,
520        name: impl Into<String>,
521        perm: Permission,
522    ) -> Result<RoleGrantPermissionResponse> {
523        self.auth.role_grant_permission(name, perm).await
524    }
525
526    /// Revokes role permission.
527    #[inline]
528    pub async fn role_revoke_permission(
529        &mut self,
530        name: impl Into<String>,
531        key: impl Into<Vec<u8>>,
532        options: Option<RoleRevokePermissionOptions>,
533    ) -> Result<RoleRevokePermissionResponse> {
534        self.auth.role_revoke_permission(name, key, options).await
535    }
536
537    /// Add an user.
538    #[inline]
539    pub async fn user_add(
540        &mut self,
541        name: impl Into<String>,
542        password: impl Into<String>,
543        options: Option<UserAddOptions>,
544    ) -> Result<UserAddResponse> {
545        self.auth.user_add(name, password, options).await
546    }
547
548    /// Gets the user info by the user name.
549    #[inline]
550    pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
551        self.auth.user_get(name).await
552    }
553
554    /// Lists all users.
555    #[inline]
556    pub async fn user_list(&mut self) -> Result<UserListResponse> {
557        self.auth.user_list().await
558    }
559
560    /// Deletes the given key from the key-value store.
561    #[inline]
562    pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
563        self.auth.user_delete(name).await
564    }
565
566    /// Change password for an user.
567    #[inline]
568    pub async fn user_change_password(
569        &mut self,
570        name: impl Into<String>,
571        password: impl Into<String>,
572    ) -> Result<UserChangePasswordResponse> {
573        self.auth.user_change_password(name, password).await
574    }
575
576    /// Grant role for an user.
577    #[inline]
578    pub async fn user_grant_role(
579        &mut self,
580        user: impl Into<String>,
581        role: impl Into<String>,
582    ) -> Result<UserGrantRoleResponse> {
583        self.auth.user_grant_role(user, role).await
584    }
585
586    /// Revoke role for an user.
587    #[inline]
588    pub async fn user_revoke_role(
589        &mut self,
590        user: impl Into<String>,
591        role: impl Into<String>,
592    ) -> Result<UserRevokeRoleResponse> {
593        self.auth.user_revoke_role(user, role).await
594    }
595
596    /// Maintain(get, active or inactive) alarms of members.
597    #[inline]
598    pub async fn alarm(
599        &mut self,
600        alarm_action: AlarmAction,
601        alarm_type: AlarmType,
602        options: Option<AlarmOptions>,
603    ) -> Result<AlarmResponse> {
604        self.maintenance
605            .alarm(alarm_action, alarm_type, options)
606            .await
607    }
608
609    /// Gets the status of a member.
610    #[inline]
611    pub async fn status(&mut self) -> Result<StatusResponse> {
612        self.maintenance.status().await
613    }
614
615    /// Defragments a member's backend database to recover storage space.
616    #[inline]
617    pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
618        self.maintenance.defragment().await
619    }
620
621    /// Computes the hash of whole backend keyspace.
622    /// including key, lease, and other buckets in storage.
623    /// This is designed for testing ONLY!
624    #[inline]
625    pub async fn hash(&mut self) -> Result<HashResponse> {
626        self.maintenance.hash().await
627    }
628
629    /// Computes the hash of all MVCC keys up to a given revision.
630    /// It only iterates \"key\" bucket in backend storage.
631    #[inline]
632    pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
633        self.maintenance.hash_kv(revision).await
634    }
635
636    /// Gets a snapshot of the entire backend from a member over a stream to a client.
637    #[inline]
638    pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
639        self.maintenance.snapshot().await
640    }
641
642    /// Adds current connected server as a member.
643    #[inline]
644    pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
645        &mut self,
646        urls: S,
647        options: Option<MemberAddOptions>,
648    ) -> Result<MemberAddResponse> {
649        let mut eps = Vec::new();
650        for e in urls.as_ref() {
651            let e = e.as_ref();
652            let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
653                e.to_string()
654            } else {
655                HTTP_PREFIX.to_owned() + e
656            };
657            eps.push(url);
658        }
659
660        self.cluster.member_add(eps, options).await
661    }
662
663    /// Remove a member.
664    #[inline]
665    pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
666        self.cluster.member_remove(id).await
667    }
668
669    /// Updates the member.
670    #[inline]
671    pub async fn member_update(
672        &mut self,
673        id: u64,
674        url: impl Into<Vec<String>>,
675    ) -> Result<MemberUpdateResponse> {
676        self.cluster.member_update(id, url).await
677    }
678
679    /// Promotes the member.
680    #[inline]
681    pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
682        self.cluster.member_promote(id).await
683    }
684
685    /// Lists members.
686    #[inline]
687    pub async fn member_list(&mut self) -> Result<MemberListResponse> {
688        self.cluster.member_list().await
689    }
690
691    /// Moves the current leader node to target node.
692    #[inline]
693    pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
694        self.maintenance.move_leader(target_id).await
695    }
696
697    /// Puts a value as eligible for the election on the prefix key.
698    /// Multiple sessions can participate in the election for the
699    /// same prefix, but only one can be the leader at a time.
700    #[inline]
701    pub async fn campaign(
702        &mut self,
703        name: impl Into<Vec<u8>>,
704        value: impl Into<Vec<u8>>,
705        lease: i64,
706    ) -> Result<CampaignResponse> {
707        self.election.campaign(name, value, lease).await
708    }
709
710    /// Lets the leader announce a new value without another election.
711    #[inline]
712    pub async fn proclaim(
713        &mut self,
714        value: impl Into<Vec<u8>>,
715        options: Option<ProclaimOptions>,
716    ) -> Result<ProclaimResponse> {
717        self.election.proclaim(value, options).await
718    }
719
720    /// Returns the leader value for the current election.
721    #[inline]
722    pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
723        self.election.leader(name).await
724    }
725
726    /// Returns a channel that reliably observes ordered leader proposals
727    /// as GetResponse values on every current elected leader key.
728    #[inline]
729    pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
730        self.election.observe(name).await
731    }
732
733    /// Releases election leadership and then start a new election
734    #[inline]
735    pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
736        self.election.resign(option).await
737    }
738
739    /// Sets client-side authentication.
740    pub async fn set_client_auth(&mut self, name: String, password: String) -> Result<()> {
741        self.auth.set_client_auth(name, password).await
742    }
743
744    /// Removes client-side authentication.
745    pub fn remove_client_auth(&mut self) {
746        self.auth.remove_client_auth();
747    }
748}
749
750/// Options for `Connect` operation.
751#[derive(Debug, Default, Clone)]
752pub struct ConnectOptions {
753    /// user is a pair values of name and password
754    user: Option<(String, String)>,
755    /// HTTP2 keep-alive: (keep_alive_interval, keep_alive_timeout)
756    keep_alive: Option<(Duration, Duration)>,
757    /// Whether send keep alive pings even there are no active streams.
758    keep_alive_while_idle: bool,
759    /// Apply a timeout to each gRPC request.
760    timeout: Option<Duration>,
761    /// Apply a timeout to connecting to the endpoint.
762    connect_timeout: Option<Duration>,
763    /// TCP keepalive.
764    tcp_keepalive: Option<Duration>,
765    #[cfg(feature = "tls")]
766    tls: Option<TlsOptions>,
767    #[cfg(feature = "tls-openssl")]
768    otls: Option<OpenSslResult<OpenSslConnector>>,
769}
770
771impl ConnectOptions {
772    /// name is the identifier for the distributed shared lock to be acquired.
773    #[inline]
774    pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
775        self.user = Some((name.into(), password.into()));
776        self
777    }
778
779    /// Sets TLS options.
780    ///
781    /// Notes that this function have to work with `HTTPS` URLs.
782    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
783    #[cfg(feature = "tls")]
784    #[inline]
785    pub fn with_tls(mut self, tls: TlsOptions) -> Self {
786        self.tls = Some(tls);
787        self
788    }
789
790    /// Sets TLS options, however using the OpenSSL implementation.
791    #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
792    #[cfg(feature = "tls-openssl")]
793    #[inline]
794    pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
795        // NOTE1: Perhaps we can unify the essential TLS config terms by something like `TlsBuilder`?
796        //
797        // NOTE2: we delay the checking at connection step to keep consistency with tonic, however would
798        // things be better if we validate the config at here?
799        self.otls = Some(otls.build());
800        self
801    }
802
803    /// Enable HTTP2 keep-alive with `interval` and `timeout`.
804    #[inline]
805    pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
806        self.keep_alive = Some((interval, timeout));
807        self
808    }
809
810    /// Apply a timeout to each request.
811    #[inline]
812    pub fn with_timeout(mut self, timeout: Duration) -> Self {
813        self.timeout = Some(timeout);
814        self
815    }
816
817    /// Apply a timeout to connecting to the endpoint.
818    #[inline]
819    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
820        self.connect_timeout = Some(timeout);
821        self
822    }
823
824    /// Enable TCP keepalive.
825    #[inline]
826    pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
827        self.tcp_keepalive = Some(tcp_keepalive);
828        self
829    }
830
831    /// Whether send keep alive pings even there are no active requests.
832    /// If disabled, keep-alive pings are only sent while there are opened request/response streams.
833    /// If enabled, pings are also sent when no streams are active.
834    /// NOTE: Some implementations of gRPC server may send GOAWAY if there are too many pings.
835    ///       This would be useful if you meet some error like `too many pings`.
836    #[inline]
837    pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
838        self.keep_alive_while_idle = enabled;
839        self
840    }
841
842    /// Creates a `ConnectOptions`.
843    #[inline]
844    pub const fn new() -> Self {
845        ConnectOptions {
846            user: None,
847            keep_alive: None,
848            keep_alive_while_idle: true,
849            timeout: None,
850            connect_timeout: None,
851            tcp_keepalive: None,
852            #[cfg(feature = "tls")]
853            tls: None,
854            #[cfg(feature = "tls-openssl")]
855            otls: None,
856        }
857    }
858}