etcd_client/
client.rs

1//! Asynchronous client & synchronous client.
2
3#[cfg(feature = "raw-channel")]
4use crate::channel::Channel;
5use crate::error::{Error, Result};
6use crate::intercept::{InterceptedChannel, Interceptor};
7use crate::lock::RwLockExt;
8#[cfg(feature = "tls-openssl")]
9use crate::openssl_tls::{OpenSslClientConfig, OpenSslConnector};
10use crate::rpc::auth::Permission;
11use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
12use crate::rpc::auth::{
13    RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
14    RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
15    UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
16    UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
17};
18use crate::rpc::cluster::{
19    ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
20    MemberRemoveResponse, MemberUpdateResponse,
21};
22use crate::rpc::election::{
23    CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
24    ProclaimResponse, ResignOptions, ResignResponse,
25};
26use crate::rpc::kv::{
27    CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
28    KvClient, PutOptions, PutResponse, Txn, TxnResponse,
29};
30use crate::rpc::lease::{
31    LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
32    LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
33};
34use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
35use crate::rpc::maintenance::{
36    AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
37    HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
38};
39use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream};
40#[cfg(feature = "tls-openssl")]
41use crate::OpenSslResult;
42#[cfg(feature = "tls")]
43use crate::TlsOptions;
44use http::uri::Uri;
45use tonic::metadata::{Ascii, MetadataValue};
46
47use std::str::FromStr;
48use std::sync::{Arc, RwLock};
49use std::time::Duration;
50use tokio::sync::mpsc::Sender;
51
52use tonic::transport::{channel::Change, Endpoint};
53
54const HTTP_PREFIX: &str = "http://";
55const HTTPS_PREFIX: &str = "https://";
56
57/// Asynchronous `etcd` client using v3 API.
58#[derive(Clone)]
59pub struct Client {
60    kv: KvClient,
61    watch: WatchClient,
62    lease: LeaseClient,
63    lock: LockClient,
64    auth: AuthClient,
65    maintenance: MaintenanceClient,
66    cluster: ClusterClient,
67    election: ElectionClient,
68    options: ConnectOptions,
69    tx: Option<Sender<Change<Uri, Endpoint>>>,
70    auth_token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
71}
72
73impl Client {
74    /// Connect to `etcd` servers from given `endpoints`.
75    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
76        endpoints: S,
77        options: Option<ConnectOptions>,
78    ) -> Result<Self> {
79        #[cfg(not(feature = "tls-openssl"))]
80        let make_balanced_channel = crate::channel::Tonic;
81        #[cfg(feature = "tls-openssl")]
82        let make_balanced_channel = crate::channel::Openssl {
83            conn: options
84                .clone()
85                .and_then(|o| o.otls)
86                .unwrap_or_else(OpenSslConnector::create_default)?,
87        };
88        Self::connect_with_balanced_channel(endpoints, options, make_balanced_channel).await
89    }
90
91    /// Connect to `etcd` servers from given `endpoints` and a balanced channel.
92    pub async fn connect_with_balanced_channel<E: AsRef<str>, S: AsRef<[E]>, MBC>(
93        endpoints: S,
94        options: Option<ConnectOptions>,
95        make_balanced_channel: MBC,
96    ) -> Result<Self>
97    where
98        MBC: crate::channel::BalancedChannelBuilder,
99        crate::error::Error: From<MBC::Error>,
100    {
101        let options = options.unwrap_or_default();
102        let endpoints = {
103            let mut eps = Vec::new();
104            for e in endpoints.as_ref() {
105                let channel = Self::build_endpoint(e.as_ref(), &options)?;
106                eps.push(channel);
107            }
108            eps
109        };
110
111        if endpoints.is_empty() {
112            return Err(Error::InvalidArgs(String::from("empty endpoints")));
113        }
114
115        let auth_token = Arc::new(RwLock::new(None));
116
117        // Always use balance strategy even if there is only one endpoint.
118        let (channel, tx) = make_balanced_channel.balanced_channel(64)?;
119        let channel = InterceptedChannel::new(
120            channel,
121            Interceptor {
122                require_leader: options.require_leader,
123                auth_token: auth_token.clone(),
124            },
125        );
126        for endpoint in endpoints {
127            // The rx inside `channel` may be closed or error, e.g. the balanced service is
128            // openssl based and the openssl connector is misconfigured, the send here may fail.
129            tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
130                .await
131                .map_err(|_| {
132                    Error::Internal("failed to insert endpoint into the balanced channel".into())
133                })?;
134        }
135
136        let client = Self::build_client(channel, Some(tx), auth_token, options);
137        client.refresh_token().await?;
138        Ok(client)
139    }
140
141    #[cfg(feature = "raw-channel")]
142    /// Connect to `etcd` servers represented by the given `channel`.
143    pub async fn from_channel(channel: Channel, options: Option<ConnectOptions>) -> Result<Self> {
144        let options = options.unwrap_or_default();
145        let auth_token = Arc::new(RwLock::new(None));
146        let channel = InterceptedChannel::new(
147            channel,
148            Interceptor {
149                require_leader: options.require_leader,
150                auth_token: auth_token.clone(),
151            },
152        );
153
154        let client = Self::build_client(channel, None, auth_token, options);
155        client.refresh_token().await?;
156        Ok(client)
157    }
158
159    fn build_endpoint(url: &str, options: &ConnectOptions) -> Result<Endpoint> {
160        use tonic::transport::Channel as TonicChannel;
161        let mut endpoint = if url.starts_with(HTTP_PREFIX) {
162            #[cfg(feature = "tls")]
163            if options.tls.is_some() {
164                return Err(Error::InvalidArgs(String::from(
165                    "TLS options are only supported with HTTPS URLs",
166                )));
167            }
168
169            TonicChannel::builder(url.parse()?)
170        } else if url.starts_with(HTTPS_PREFIX) {
171            #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
172            return Err(Error::InvalidArgs(String::from(
173                "HTTPS URLs are only supported with the feature \"tls\"",
174            )));
175
176            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
177            {
178                TonicChannel::builder(url.parse()?)
179            }
180
181            #[cfg(feature = "tls")]
182            {
183                let tls = options.tls.clone().unwrap_or_default();
184                TonicChannel::builder(url.parse()?).tls_config(tls)?
185            }
186        } else {
187            #[cfg(feature = "tls")]
188            {
189                let tls = options.tls.clone();
190
191                match tls {
192                    Some(tls) => {
193                        let e = HTTPS_PREFIX.to_owned() + url;
194                        TonicChannel::builder(e.parse()?).tls_config(tls)?
195                    }
196                    None => {
197                        let e = HTTP_PREFIX.to_owned() + url;
198                        TonicChannel::builder(e.parse()?)
199                    }
200                }
201            }
202
203            #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
204            {
205                let pfx = if options.otls.as_ref().is_some() {
206                    HTTPS_PREFIX
207                } else {
208                    HTTP_PREFIX
209                };
210                let e = pfx.to_owned() + url;
211                TonicChannel::builder(e.parse()?)
212            }
213
214            #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
215            {
216                let e = HTTP_PREFIX.to_owned() + url;
217                TonicChannel::builder(e.parse()?)
218            }
219        };
220
221        if let Some((interval, timeout)) = options.keep_alive {
222            endpoint = endpoint
223                .keep_alive_while_idle(options.keep_alive_while_idle)
224                .http2_keep_alive_interval(interval)
225                .keep_alive_timeout(timeout);
226        }
227
228        if let Some(timeout) = options.timeout {
229            endpoint = endpoint.timeout(timeout);
230        }
231
232        if let Some(timeout) = options.connect_timeout {
233            endpoint = endpoint.connect_timeout(timeout);
234        }
235
236        if let Some(tcp_keepalive) = options.tcp_keepalive {
237            endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
238        }
239
240        Ok(endpoint)
241    }
242
243    fn build_client(
244        channel: InterceptedChannel,
245        tx: Option<Sender<Change<Uri, Endpoint>>>,
246        auth_token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
247        options: ConnectOptions,
248    ) -> Self {
249        let kv = KvClient::new(channel.clone());
250        let watch = WatchClient::new(channel.clone());
251        let lease = LeaseClient::new(channel.clone());
252        let lock = LockClient::new(channel.clone());
253        let auth = AuthClient::new(channel.clone());
254        let cluster = ClusterClient::new(channel.clone());
255        let maintenance = MaintenanceClient::new(channel.clone());
256        let election = ElectionClient::new(channel);
257
258        Self {
259            kv,
260            watch,
261            lease,
262            lock,
263            auth,
264            maintenance,
265            cluster,
266            election,
267            options,
268            tx,
269            auth_token,
270        }
271    }
272
273    /// Dynamically add an endpoint to the client.
274    ///
275    /// Which can be used to add a new member to the underlying balance cache.
276    /// The typical scenario is that application can use a services discovery
277    /// to discover the member list changes and add/remove them to/from the client.
278    ///
279    /// Note that the [`Client`] doesn't check the authentication before added.
280    /// So the etcd member of the added endpoint REQUIRES to use the same auth
281    /// token as when create the client. Otherwise, the underlying balance
282    /// services will not be able to connect to the new endpoint.
283    #[inline]
284    pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
285        let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
286        let Some(tx) = &self.tx else {
287            return Err(Error::EndpointsNotManaged);
288        };
289        tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
290            .await
291            .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {e}")))
292    }
293
294    /// Dynamically remove an endpoint from the client.
295    ///
296    /// Note that the `endpoint` str should be the same as it was added.
297    /// And the underlying balance services cache used the hash from the Uri,
298    /// which was parsed from `endpoint` str, to do the equality comparisons.
299    #[inline]
300    pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
301        let uri = http::Uri::from_str(endpoint.as_ref())?;
302        let Some(tx) = &self.tx else {
303            return Err(Error::EndpointsNotManaged);
304        };
305        tx.send(Change::Remove(uri))
306            .await
307            .map_err(|e| Error::EndpointError(format!("failed to remove endpoint because of {e}")))
308    }
309
310    /// Gets a KV client.
311    #[inline]
312    pub fn kv_client(&self) -> KvClient {
313        self.kv.clone()
314    }
315
316    /// Gets a watch client.
317    #[inline]
318    pub fn watch_client(&self) -> WatchClient {
319        self.watch.clone()
320    }
321
322    /// Gets a lease client.
323    #[inline]
324    pub fn lease_client(&self) -> LeaseClient {
325        self.lease.clone()
326    }
327
328    /// Gets an auth client.
329    #[inline]
330    pub fn auth_client(&self) -> AuthClient {
331        self.auth.clone()
332    }
333
334    /// Gets a maintenance client.
335    #[inline]
336    pub fn maintenance_client(&self) -> MaintenanceClient {
337        self.maintenance.clone()
338    }
339
340    /// Gets a cluster client.
341    #[inline]
342    pub fn cluster_client(&self) -> ClusterClient {
343        self.cluster.clone()
344    }
345
346    /// Gets a lock client.
347    #[inline]
348    pub fn lock_client(&self) -> LockClient {
349        self.lock.clone()
350    }
351
352    /// Gets a election client.
353    #[inline]
354    pub fn election_client(&self) -> ElectionClient {
355        self.election.clone()
356    }
357
358    /// Put the given key into the key-value store.
359    /// A put request increments the revision of the key-value store
360    /// and generates one event in the event history.
361    #[inline]
362    pub async fn put(
363        &mut self,
364        key: impl Into<Vec<u8>>,
365        value: impl Into<Vec<u8>>,
366        options: Option<PutOptions>,
367    ) -> Result<PutResponse> {
368        self.kv.put(key, value, options).await
369    }
370
371    /// Gets the key from the key-value store.
372    #[inline]
373    pub async fn get(
374        &mut self,
375        key: impl Into<Vec<u8>>,
376        options: Option<GetOptions>,
377    ) -> Result<GetResponse> {
378        self.kv.get(key, options).await
379    }
380
381    /// Deletes the given key from the key-value store.
382    #[inline]
383    pub async fn delete(
384        &mut self,
385        key: impl Into<Vec<u8>>,
386        options: Option<DeleteOptions>,
387    ) -> Result<DeleteResponse> {
388        self.kv.delete(key, options).await
389    }
390
391    /// Compacts the event history in the etcd key-value store. The key-value
392    /// store should be periodically compacted or the event history will continue to grow
393    /// indefinitely.
394    #[inline]
395    pub async fn compact(
396        &mut self,
397        revision: i64,
398        options: Option<CompactionOptions>,
399    ) -> Result<CompactionResponse> {
400        self.kv.compact(revision, options).await
401    }
402
403    /// Processes multiple operations in a single transaction.
404    /// A txn request increments the revision of the key-value store
405    /// and generates events with the same revision for every completed operation.
406    /// It is not allowed to modify the same key several times within one txn.
407    #[inline]
408    pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
409        self.kv.txn(txn).await
410    }
411
412    /// Watches for events happening or that have happened. Both input and output
413    /// are streams; the input stream is for creating and canceling watcher and the output
414    /// stream sends events. The entire event history can be watched starting from the
415    /// last compaction revision.
416    #[inline]
417    pub async fn watch(
418        &mut self,
419        key: impl Into<Vec<u8>>,
420        options: Option<WatchOptions>,
421    ) -> Result<WatchStream> {
422        self.watch.watch(key, options).await
423    }
424
425    /// Creates a lease which expires if the server does not receive a keepAlive
426    /// within a given time to live period. All keys attached to the lease will be expired and
427    /// deleted if the lease expires. Each expired key generates a delete event in the event history.
428    #[inline]
429    pub async fn lease_grant(
430        &mut self,
431        ttl: i64,
432        options: Option<LeaseGrantOptions>,
433    ) -> Result<LeaseGrantResponse> {
434        self.lease.grant(ttl, options).await
435    }
436
437    /// Revokes a lease. All keys attached to the lease will expire and be deleted.
438    #[inline]
439    pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
440        self.lease.revoke(id).await
441    }
442
443    /// Keeps the lease alive by streaming keep alive requests from the client
444    /// to the server and streaming keep alive responses from the server to the client.
445    #[inline]
446    pub async fn lease_keep_alive(
447        &mut self,
448        id: i64,
449    ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
450        self.lease.keep_alive(id).await
451    }
452
453    /// Retrieves lease information.
454    #[inline]
455    pub async fn lease_time_to_live(
456        &mut self,
457        id: i64,
458        options: Option<LeaseTimeToLiveOptions>,
459    ) -> Result<LeaseTimeToLiveResponse> {
460        self.lease.time_to_live(id, options).await
461    }
462
463    /// Lists all existing leases.
464    #[inline]
465    pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
466        self.lease.leases().await
467    }
468
469    /// Lock acquires a distributed shared lock on a given named lock.
470    /// On success, it will return a unique key that exists so long as the
471    /// lock is held by the caller. This key can be used in conjunction with
472    /// transactions to safely ensure updates to etcd only occur while holding
473    /// lock ownership. The lock is held until Unlock is called on the key or the
474    /// lease associate with the owner expires.
475    #[inline]
476    pub async fn lock(
477        &mut self,
478        name: impl Into<Vec<u8>>,
479        options: Option<LockOptions>,
480    ) -> Result<LockResponse> {
481        self.lock.lock(name, options).await
482    }
483
484    /// Unlock takes a key returned by Lock and releases the hold on lock. The
485    /// next Lock caller waiting for the lock will then be woken up and given
486    /// ownership of the lock.
487    #[inline]
488    pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
489        self.lock.unlock(key).await
490    }
491
492    /// Enables authentication.
493    #[inline]
494    pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
495        self.auth.auth_enable().await
496    }
497
498    /// Disables authentication.
499    #[inline]
500    pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
501        self.auth.auth_disable().await
502    }
503
504    /// Adds role.
505    #[inline]
506    pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
507        self.auth.role_add(name).await
508    }
509
510    /// Deletes role.
511    #[inline]
512    pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
513        self.auth.role_delete(name).await
514    }
515
516    /// Gets role.
517    #[inline]
518    pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
519        self.auth.role_get(name).await
520    }
521
522    /// Lists role.
523    #[inline]
524    pub async fn role_list(&mut self) -> Result<RoleListResponse> {
525        self.auth.role_list().await
526    }
527
528    /// Grants role permission.
529    #[inline]
530    pub async fn role_grant_permission(
531        &mut self,
532        name: impl Into<String>,
533        perm: Permission,
534    ) -> Result<RoleGrantPermissionResponse> {
535        self.auth.role_grant_permission(name, perm).await
536    }
537
538    /// Revokes role permission.
539    #[inline]
540    pub async fn role_revoke_permission(
541        &mut self,
542        name: impl Into<String>,
543        key: impl Into<Vec<u8>>,
544        options: Option<RoleRevokePermissionOptions>,
545    ) -> Result<RoleRevokePermissionResponse> {
546        self.auth.role_revoke_permission(name, key, options).await
547    }
548
549    /// Add an user.
550    #[inline]
551    pub async fn user_add(
552        &mut self,
553        name: impl Into<String>,
554        password: impl Into<String>,
555        options: Option<UserAddOptions>,
556    ) -> Result<UserAddResponse> {
557        self.auth.user_add(name, password, options).await
558    }
559
560    /// Gets the user info by the user name.
561    #[inline]
562    pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
563        self.auth.user_get(name).await
564    }
565
566    /// Lists all users.
567    #[inline]
568    pub async fn user_list(&mut self) -> Result<UserListResponse> {
569        self.auth.user_list().await
570    }
571
572    /// Deletes the given key from the key-value store.
573    #[inline]
574    pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
575        self.auth.user_delete(name).await
576    }
577
578    /// Change password for an user.
579    #[inline]
580    pub async fn user_change_password(
581        &mut self,
582        name: impl Into<String>,
583        password: impl Into<String>,
584    ) -> Result<UserChangePasswordResponse> {
585        self.auth.user_change_password(name, password).await
586    }
587
588    /// Grant role for an user.
589    #[inline]
590    pub async fn user_grant_role(
591        &mut self,
592        user: impl Into<String>,
593        role: impl Into<String>,
594    ) -> Result<UserGrantRoleResponse> {
595        self.auth.user_grant_role(user, role).await
596    }
597
598    /// Revoke role for an user.
599    #[inline]
600    pub async fn user_revoke_role(
601        &mut self,
602        user: impl Into<String>,
603        role: impl Into<String>,
604    ) -> Result<UserRevokeRoleResponse> {
605        self.auth.user_revoke_role(user, role).await
606    }
607
608    /// Maintain(get, active or inactive) alarms of members.
609    #[inline]
610    pub async fn alarm(
611        &mut self,
612        alarm_action: AlarmAction,
613        alarm_type: AlarmType,
614        options: Option<AlarmOptions>,
615    ) -> Result<AlarmResponse> {
616        self.maintenance
617            .alarm(alarm_action, alarm_type, options)
618            .await
619    }
620
621    /// Gets the status of a member.
622    #[inline]
623    pub async fn status(&mut self) -> Result<StatusResponse> {
624        self.maintenance.status().await
625    }
626
627    /// Defragments a member's backend database to recover storage space.
628    #[inline]
629    pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
630        self.maintenance.defragment().await
631    }
632
633    /// Computes the hash of whole backend keyspace.
634    /// including key, lease, and other buckets in storage.
635    /// This is designed for testing ONLY!
636    #[inline]
637    pub async fn hash(&mut self) -> Result<HashResponse> {
638        self.maintenance.hash().await
639    }
640
641    /// Computes the hash of all MVCC keys up to a given revision.
642    /// It only iterates \"key\" bucket in backend storage.
643    #[inline]
644    pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
645        self.maintenance.hash_kv(revision).await
646    }
647
648    /// Gets a snapshot of the entire backend from a member over a stream to a client.
649    #[inline]
650    pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
651        self.maintenance.snapshot().await
652    }
653
654    /// Adds current connected server as a member.
655    #[inline]
656    pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
657        &mut self,
658        urls: S,
659        options: Option<MemberAddOptions>,
660    ) -> Result<MemberAddResponse> {
661        let mut eps = Vec::new();
662        for e in urls.as_ref() {
663            let e = e.as_ref();
664            let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
665                e.to_string()
666            } else {
667                HTTP_PREFIX.to_owned() + e
668            };
669            eps.push(url);
670        }
671
672        self.cluster.member_add(eps, options).await
673    }
674
675    /// Remove a member.
676    #[inline]
677    pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
678        self.cluster.member_remove(id).await
679    }
680
681    /// Updates the member.
682    #[inline]
683    pub async fn member_update(
684        &mut self,
685        id: u64,
686        url: impl Into<Vec<String>>,
687    ) -> Result<MemberUpdateResponse> {
688        self.cluster.member_update(id, url).await
689    }
690
691    /// Promotes the member.
692    #[inline]
693    pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
694        self.cluster.member_promote(id).await
695    }
696
697    /// Lists members.
698    #[inline]
699    pub async fn member_list(&mut self) -> Result<MemberListResponse> {
700        self.cluster.member_list().await
701    }
702
703    /// Moves the current leader node to target node.
704    #[inline]
705    pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
706        self.maintenance.move_leader(target_id).await
707    }
708
709    /// Puts a value as eligible for the election on the prefix key.
710    /// Multiple sessions can participate in the election for the
711    /// same prefix, but only one can be the leader at a time.
712    #[inline]
713    pub async fn campaign(
714        &mut self,
715        name: impl Into<Vec<u8>>,
716        value: impl Into<Vec<u8>>,
717        lease: i64,
718    ) -> Result<CampaignResponse> {
719        self.election.campaign(name, value, lease).await
720    }
721
722    /// Lets the leader announce a new value without another election.
723    #[inline]
724    pub async fn proclaim(
725        &mut self,
726        value: impl Into<Vec<u8>>,
727        options: Option<ProclaimOptions>,
728    ) -> Result<ProclaimResponse> {
729        self.election.proclaim(value, options).await
730    }
731
732    /// Returns the leader value for the current election.
733    #[inline]
734    pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
735        self.election.leader(name).await
736    }
737
738    /// Returns a channel that reliably observes ordered leader proposals
739    /// as GetResponse values on every current elected leader key.
740    #[inline]
741    pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
742        self.election.observe(name).await
743    }
744
745    /// Releases election leadership and then start a new election
746    #[inline]
747    pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
748        self.election.resign(option).await
749    }
750
751    async fn do_authenticate(
752        &self,
753        user: String,
754        password: String,
755    ) -> Result<MetadataValue<Ascii>> {
756        let resp = self.auth_client().authenticate(user, password).await?;
757        let token = resp.token().parse()?;
758        Ok(token)
759    }
760
761    /// Refresh the authentication token if the client has credentials options.
762    pub async fn refresh_token(&self) -> Result<()> {
763        if let Some((user, password)) = self.options.user.as_ref() {
764            let token = self.do_authenticate(user.clone(), password.clone()).await?;
765            self.auth_token.write_unpoisoned().replace(token);
766        } else {
767            let _ = self.auth_token.write_unpoisoned().take();
768        }
769        Ok(())
770    }
771
772    /// Updates the user credentials for the client in flight.
773    ///
774    /// Client will perform the authentication with the given user credentials. If successful, the
775    /// authentication token will be updated in the client. Nothing happens if the authentication
776    /// fails.
777    ///
778    /// If the user is `None`, it will remove the authentication token from the client.
779    pub async fn update_user(&mut self, user: Option<(String, String)>) -> Result<()> {
780        if let Some((ref name, ref password)) = user {
781            let token = self.do_authenticate(name.clone(), password.clone()).await?;
782            self.auth_token.write_unpoisoned().replace(token);
783        } else {
784            let _ = self.auth_token.write_unpoisoned().take();
785        }
786        self.options.user = user;
787        Ok(())
788    }
789}
790
791/// Options for `Connect` operation.
792#[derive(Debug, Default, Clone)]
793pub struct ConnectOptions {
794    /// user is a pair values of name and password
795    user: Option<(String, String)>,
796    /// HTTP2 keep-alive: (keep_alive_interval, keep_alive_timeout)
797    keep_alive: Option<(Duration, Duration)>,
798    /// Whether send keep alive pings even there are no active streams.
799    keep_alive_while_idle: bool,
800    /// Apply a timeout to each gRPC request.
801    timeout: Option<Duration>,
802    /// Apply a timeout to connecting to the endpoint.
803    connect_timeout: Option<Duration>,
804    /// TCP keepalive.
805    tcp_keepalive: Option<Duration>,
806    #[cfg(feature = "tls")]
807    tls: Option<TlsOptions>,
808    #[cfg(feature = "tls-openssl")]
809    otls: Option<OpenSslResult<OpenSslConnector>>,
810    /// Require a leader to be present for the operation to complete.
811    require_leader: bool,
812}
813
814impl ConnectOptions {
815    /// name is the identifier for the distributed shared lock to be acquired.
816    #[inline]
817    pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
818        self.user = Some((name.into(), password.into()));
819        self
820    }
821
822    /// Sets TLS options.
823    ///
824    /// Notes that this function have to work with `HTTPS` URLs.
825    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
826    #[cfg(feature = "tls")]
827    #[inline]
828    pub fn with_tls(mut self, tls: TlsOptions) -> Self {
829        self.tls = Some(tls);
830        self
831    }
832
833    /// Sets TLS options, however using the OpenSSL implementation.
834    #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
835    #[cfg(feature = "tls-openssl")]
836    #[inline]
837    pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
838        // NOTE1: Perhaps we can unify the essential TLS config terms by something like `TlsBuilder`?
839        //
840        // NOTE2: we delay the checking at connection step to keep consistency with tonic, however would
841        // things be better if we validate the config at here?
842        self.otls = Some(otls.build());
843        self
844    }
845
846    /// Enable HTTP2 keep-alive with `interval` and `timeout`.
847    #[inline]
848    pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
849        self.keep_alive = Some((interval, timeout));
850        self
851    }
852
853    /// Apply a timeout to each request.
854    #[inline]
855    pub fn with_timeout(mut self, timeout: Duration) -> Self {
856        self.timeout = Some(timeout);
857        self
858    }
859
860    /// Apply a timeout to connecting to the endpoint.
861    #[inline]
862    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
863        self.connect_timeout = Some(timeout);
864        self
865    }
866
867    /// Enable TCP keepalive.
868    #[inline]
869    pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
870        self.tcp_keepalive = Some(tcp_keepalive);
871        self
872    }
873
874    /// Whether send keep alive pings even there are no active requests.
875    /// If disabled, keep-alive pings are only sent while there are opened request/response streams.
876    /// If enabled, pings are also sent when no streams are active.
877    /// NOTE: Some implementations of gRPC server may send GOAWAY if there are too many pings.
878    ///       This would be useful if you meet some error like `too many pings`.
879    #[inline]
880    pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
881        self.keep_alive_while_idle = enabled;
882        self
883    }
884
885    /// Whether to enforce that a leader be present in the etcd cluster.
886    #[inline]
887    pub fn with_require_leader(mut self, require_leader: bool) -> Self {
888        self.require_leader = require_leader;
889        self
890    }
891
892    /// Creates a `ConnectOptions`.
893    #[inline]
894    pub const fn new() -> Self {
895        ConnectOptions {
896            user: None,
897            keep_alive: None,
898            keep_alive_while_idle: true,
899            timeout: None,
900            connect_timeout: None,
901            tcp_keepalive: None,
902            #[cfg(feature = "tls")]
903            tls: None,
904            #[cfg(feature = "tls-openssl")]
905            otls: None,
906            require_leader: false,
907        }
908    }
909}