1#[cfg(feature = "raw-channel")]
4use crate::channel::Channel;
5use crate::error::{Error, Result};
6use crate::intercept::{InterceptedChannel, Interceptor};
7use crate::lock::RwLockExt;
8#[cfg(feature = "tls-openssl")]
9use crate::openssl_tls::{OpenSslClientConfig, OpenSslConnector};
10use crate::rpc::auth::Permission;
11use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
12use crate::rpc::auth::{
13 RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
14 RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
15 UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
16 UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
17};
18use crate::rpc::cluster::{
19 ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
20 MemberRemoveResponse, MemberUpdateResponse,
21};
22use crate::rpc::election::{
23 CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
24 ProclaimResponse, ResignOptions, ResignResponse,
25};
26use crate::rpc::kv::{
27 CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
28 KvClient, PutOptions, PutResponse, Txn, TxnResponse,
29};
30use crate::rpc::lease::{
31 LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
32 LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
33};
34use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
35use crate::rpc::maintenance::{
36 AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
37 HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
38};
39use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream};
40#[cfg(feature = "tls-openssl")]
41use crate::OpenSslResult;
42#[cfg(feature = "tls")]
43use crate::TlsOptions;
44use http::uri::Uri;
45use tonic::metadata::{Ascii, MetadataValue};
46
47use std::str::FromStr;
48use std::sync::{Arc, RwLock};
49use std::time::Duration;
50use tokio::sync::mpsc::Sender;
51
52use tonic::transport::{channel::Change, Endpoint};
53
54const HTTP_PREFIX: &str = "http://";
55const HTTPS_PREFIX: &str = "https://";
56
57#[derive(Clone)]
59pub struct Client {
60 kv: KvClient,
61 watch: WatchClient,
62 lease: LeaseClient,
63 lock: LockClient,
64 auth: AuthClient,
65 maintenance: MaintenanceClient,
66 cluster: ClusterClient,
67 election: ElectionClient,
68 options: ConnectOptions,
69 tx: Option<Sender<Change<Uri, Endpoint>>>,
70 auth_token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
71}
72
73impl Client {
74 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
76 endpoints: S,
77 options: Option<ConnectOptions>,
78 ) -> Result<Self> {
79 #[cfg(not(feature = "tls-openssl"))]
80 let make_balanced_channel = crate::channel::Tonic;
81 #[cfg(feature = "tls-openssl")]
82 let make_balanced_channel = crate::channel::Openssl {
83 conn: options
84 .clone()
85 .and_then(|o| o.otls)
86 .unwrap_or_else(OpenSslConnector::create_default)?,
87 };
88 Self::connect_with_balanced_channel(endpoints, options, make_balanced_channel).await
89 }
90
91 pub async fn connect_with_balanced_channel<E: AsRef<str>, S: AsRef<[E]>, MBC>(
93 endpoints: S,
94 options: Option<ConnectOptions>,
95 make_balanced_channel: MBC,
96 ) -> Result<Self>
97 where
98 MBC: crate::channel::BalancedChannelBuilder,
99 crate::error::Error: From<MBC::Error>,
100 {
101 let options = options.unwrap_or_default();
102 let endpoints = {
103 let mut eps = Vec::new();
104 for e in endpoints.as_ref() {
105 let channel = Self::build_endpoint(e.as_ref(), &options)?;
106 eps.push(channel);
107 }
108 eps
109 };
110
111 if endpoints.is_empty() {
112 return Err(Error::InvalidArgs(String::from("empty endpoints")));
113 }
114
115 let auth_token = Arc::new(RwLock::new(None));
116
117 let (channel, tx) = make_balanced_channel.balanced_channel(64)?;
119 let channel = InterceptedChannel::new(
120 channel,
121 Interceptor {
122 require_leader: options.require_leader,
123 auth_token: auth_token.clone(),
124 },
125 );
126 for endpoint in endpoints {
127 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
130 .await
131 .map_err(|_| {
132 Error::Internal("failed to insert endpoint into the balanced channel".into())
133 })?;
134 }
135
136 let client = Self::build_client(channel, Some(tx), auth_token, options);
137 client.refresh_token().await?;
138 Ok(client)
139 }
140
141 #[cfg(feature = "raw-channel")]
142 pub async fn from_channel(channel: Channel, options: Option<ConnectOptions>) -> Result<Self> {
144 let options = options.unwrap_or_default();
145 let auth_token = Arc::new(RwLock::new(None));
146 let channel = InterceptedChannel::new(
147 channel,
148 Interceptor {
149 require_leader: options.require_leader,
150 auth_token: auth_token.clone(),
151 },
152 );
153
154 let client = Self::build_client(channel, None, auth_token, options);
155 client.refresh_token().await?;
156 Ok(client)
157 }
158
159 fn build_endpoint(url: &str, options: &ConnectOptions) -> Result<Endpoint> {
160 use tonic::transport::Channel as TonicChannel;
161 let mut endpoint = if url.starts_with(HTTP_PREFIX) {
162 #[cfg(feature = "tls")]
163 if options.tls.is_some() {
164 return Err(Error::InvalidArgs(String::from(
165 "TLS options are only supported with HTTPS URLs",
166 )));
167 }
168
169 TonicChannel::builder(url.parse()?)
170 } else if url.starts_with(HTTPS_PREFIX) {
171 #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
172 return Err(Error::InvalidArgs(String::from(
173 "HTTPS URLs are only supported with the feature \"tls\"",
174 )));
175
176 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
177 {
178 TonicChannel::builder(url.parse()?)
179 }
180
181 #[cfg(feature = "tls")]
182 {
183 let tls = options.tls.clone().unwrap_or_default();
184 TonicChannel::builder(url.parse()?).tls_config(tls)?
185 }
186 } else {
187 #[cfg(feature = "tls")]
188 {
189 let tls = options.tls.clone();
190
191 match tls {
192 Some(tls) => {
193 let e = HTTPS_PREFIX.to_owned() + url;
194 TonicChannel::builder(e.parse()?).tls_config(tls)?
195 }
196 None => {
197 let e = HTTP_PREFIX.to_owned() + url;
198 TonicChannel::builder(e.parse()?)
199 }
200 }
201 }
202
203 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
204 {
205 let pfx = if options.otls.as_ref().is_some() {
206 HTTPS_PREFIX
207 } else {
208 HTTP_PREFIX
209 };
210 let e = pfx.to_owned() + url;
211 TonicChannel::builder(e.parse()?)
212 }
213
214 #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
215 {
216 let e = HTTP_PREFIX.to_owned() + url;
217 TonicChannel::builder(e.parse()?)
218 }
219 };
220
221 if let Some((interval, timeout)) = options.keep_alive {
222 endpoint = endpoint
223 .keep_alive_while_idle(options.keep_alive_while_idle)
224 .http2_keep_alive_interval(interval)
225 .keep_alive_timeout(timeout);
226 }
227
228 if let Some(timeout) = options.timeout {
229 endpoint = endpoint.timeout(timeout);
230 }
231
232 if let Some(timeout) = options.connect_timeout {
233 endpoint = endpoint.connect_timeout(timeout);
234 }
235
236 if let Some(tcp_keepalive) = options.tcp_keepalive {
237 endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
238 }
239
240 Ok(endpoint)
241 }
242
243 fn build_client(
244 channel: InterceptedChannel,
245 tx: Option<Sender<Change<Uri, Endpoint>>>,
246 auth_token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
247 options: ConnectOptions,
248 ) -> Self {
249 let kv = KvClient::new(channel.clone());
250 let watch = WatchClient::new(channel.clone());
251 let lease = LeaseClient::new(channel.clone());
252 let lock = LockClient::new(channel.clone());
253 let auth = AuthClient::new(channel.clone());
254 let cluster = ClusterClient::new(channel.clone());
255 let maintenance = MaintenanceClient::new(channel.clone());
256 let election = ElectionClient::new(channel);
257
258 Self {
259 kv,
260 watch,
261 lease,
262 lock,
263 auth,
264 maintenance,
265 cluster,
266 election,
267 options,
268 tx,
269 auth_token,
270 }
271 }
272
273 #[inline]
284 pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
285 let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
286 let Some(tx) = &self.tx else {
287 return Err(Error::EndpointsNotManaged);
288 };
289 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
290 .await
291 .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {e}")))
292 }
293
294 #[inline]
300 pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
301 let uri = http::Uri::from_str(endpoint.as_ref())?;
302 let Some(tx) = &self.tx else {
303 return Err(Error::EndpointsNotManaged);
304 };
305 tx.send(Change::Remove(uri))
306 .await
307 .map_err(|e| Error::EndpointError(format!("failed to remove endpoint because of {e}")))
308 }
309
310 #[inline]
312 pub fn kv_client(&self) -> KvClient {
313 self.kv.clone()
314 }
315
316 #[inline]
318 pub fn watch_client(&self) -> WatchClient {
319 self.watch.clone()
320 }
321
322 #[inline]
324 pub fn lease_client(&self) -> LeaseClient {
325 self.lease.clone()
326 }
327
328 #[inline]
330 pub fn auth_client(&self) -> AuthClient {
331 self.auth.clone()
332 }
333
334 #[inline]
336 pub fn maintenance_client(&self) -> MaintenanceClient {
337 self.maintenance.clone()
338 }
339
340 #[inline]
342 pub fn cluster_client(&self) -> ClusterClient {
343 self.cluster.clone()
344 }
345
346 #[inline]
348 pub fn lock_client(&self) -> LockClient {
349 self.lock.clone()
350 }
351
352 #[inline]
354 pub fn election_client(&self) -> ElectionClient {
355 self.election.clone()
356 }
357
358 #[inline]
362 pub async fn put(
363 &mut self,
364 key: impl Into<Vec<u8>>,
365 value: impl Into<Vec<u8>>,
366 options: Option<PutOptions>,
367 ) -> Result<PutResponse> {
368 self.kv.put(key, value, options).await
369 }
370
371 #[inline]
373 pub async fn get(
374 &mut self,
375 key: impl Into<Vec<u8>>,
376 options: Option<GetOptions>,
377 ) -> Result<GetResponse> {
378 self.kv.get(key, options).await
379 }
380
381 #[inline]
383 pub async fn delete(
384 &mut self,
385 key: impl Into<Vec<u8>>,
386 options: Option<DeleteOptions>,
387 ) -> Result<DeleteResponse> {
388 self.kv.delete(key, options).await
389 }
390
391 #[inline]
395 pub async fn compact(
396 &mut self,
397 revision: i64,
398 options: Option<CompactionOptions>,
399 ) -> Result<CompactionResponse> {
400 self.kv.compact(revision, options).await
401 }
402
403 #[inline]
408 pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
409 self.kv.txn(txn).await
410 }
411
412 #[inline]
417 pub async fn watch(
418 &mut self,
419 key: impl Into<Vec<u8>>,
420 options: Option<WatchOptions>,
421 ) -> Result<WatchStream> {
422 self.watch.watch(key, options).await
423 }
424
425 #[inline]
429 pub async fn lease_grant(
430 &mut self,
431 ttl: i64,
432 options: Option<LeaseGrantOptions>,
433 ) -> Result<LeaseGrantResponse> {
434 self.lease.grant(ttl, options).await
435 }
436
437 #[inline]
439 pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
440 self.lease.revoke(id).await
441 }
442
443 #[inline]
446 pub async fn lease_keep_alive(
447 &mut self,
448 id: i64,
449 ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
450 self.lease.keep_alive(id).await
451 }
452
453 #[inline]
455 pub async fn lease_time_to_live(
456 &mut self,
457 id: i64,
458 options: Option<LeaseTimeToLiveOptions>,
459 ) -> Result<LeaseTimeToLiveResponse> {
460 self.lease.time_to_live(id, options).await
461 }
462
463 #[inline]
465 pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
466 self.lease.leases().await
467 }
468
469 #[inline]
476 pub async fn lock(
477 &mut self,
478 name: impl Into<Vec<u8>>,
479 options: Option<LockOptions>,
480 ) -> Result<LockResponse> {
481 self.lock.lock(name, options).await
482 }
483
484 #[inline]
488 pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
489 self.lock.unlock(key).await
490 }
491
492 #[inline]
494 pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
495 self.auth.auth_enable().await
496 }
497
498 #[inline]
500 pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
501 self.auth.auth_disable().await
502 }
503
504 #[inline]
506 pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
507 self.auth.role_add(name).await
508 }
509
510 #[inline]
512 pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
513 self.auth.role_delete(name).await
514 }
515
516 #[inline]
518 pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
519 self.auth.role_get(name).await
520 }
521
522 #[inline]
524 pub async fn role_list(&mut self) -> Result<RoleListResponse> {
525 self.auth.role_list().await
526 }
527
528 #[inline]
530 pub async fn role_grant_permission(
531 &mut self,
532 name: impl Into<String>,
533 perm: Permission,
534 ) -> Result<RoleGrantPermissionResponse> {
535 self.auth.role_grant_permission(name, perm).await
536 }
537
538 #[inline]
540 pub async fn role_revoke_permission(
541 &mut self,
542 name: impl Into<String>,
543 key: impl Into<Vec<u8>>,
544 options: Option<RoleRevokePermissionOptions>,
545 ) -> Result<RoleRevokePermissionResponse> {
546 self.auth.role_revoke_permission(name, key, options).await
547 }
548
549 #[inline]
551 pub async fn user_add(
552 &mut self,
553 name: impl Into<String>,
554 password: impl Into<String>,
555 options: Option<UserAddOptions>,
556 ) -> Result<UserAddResponse> {
557 self.auth.user_add(name, password, options).await
558 }
559
560 #[inline]
562 pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
563 self.auth.user_get(name).await
564 }
565
566 #[inline]
568 pub async fn user_list(&mut self) -> Result<UserListResponse> {
569 self.auth.user_list().await
570 }
571
572 #[inline]
574 pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
575 self.auth.user_delete(name).await
576 }
577
578 #[inline]
580 pub async fn user_change_password(
581 &mut self,
582 name: impl Into<String>,
583 password: impl Into<String>,
584 ) -> Result<UserChangePasswordResponse> {
585 self.auth.user_change_password(name, password).await
586 }
587
588 #[inline]
590 pub async fn user_grant_role(
591 &mut self,
592 user: impl Into<String>,
593 role: impl Into<String>,
594 ) -> Result<UserGrantRoleResponse> {
595 self.auth.user_grant_role(user, role).await
596 }
597
598 #[inline]
600 pub async fn user_revoke_role(
601 &mut self,
602 user: impl Into<String>,
603 role: impl Into<String>,
604 ) -> Result<UserRevokeRoleResponse> {
605 self.auth.user_revoke_role(user, role).await
606 }
607
608 #[inline]
610 pub async fn alarm(
611 &mut self,
612 alarm_action: AlarmAction,
613 alarm_type: AlarmType,
614 options: Option<AlarmOptions>,
615 ) -> Result<AlarmResponse> {
616 self.maintenance
617 .alarm(alarm_action, alarm_type, options)
618 .await
619 }
620
621 #[inline]
623 pub async fn status(&mut self) -> Result<StatusResponse> {
624 self.maintenance.status().await
625 }
626
627 #[inline]
629 pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
630 self.maintenance.defragment().await
631 }
632
633 #[inline]
637 pub async fn hash(&mut self) -> Result<HashResponse> {
638 self.maintenance.hash().await
639 }
640
641 #[inline]
644 pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
645 self.maintenance.hash_kv(revision).await
646 }
647
648 #[inline]
650 pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
651 self.maintenance.snapshot().await
652 }
653
654 #[inline]
656 pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
657 &mut self,
658 urls: S,
659 options: Option<MemberAddOptions>,
660 ) -> Result<MemberAddResponse> {
661 let mut eps = Vec::new();
662 for e in urls.as_ref() {
663 let e = e.as_ref();
664 let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
665 e.to_string()
666 } else {
667 HTTP_PREFIX.to_owned() + e
668 };
669 eps.push(url);
670 }
671
672 self.cluster.member_add(eps, options).await
673 }
674
675 #[inline]
677 pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
678 self.cluster.member_remove(id).await
679 }
680
681 #[inline]
683 pub async fn member_update(
684 &mut self,
685 id: u64,
686 url: impl Into<Vec<String>>,
687 ) -> Result<MemberUpdateResponse> {
688 self.cluster.member_update(id, url).await
689 }
690
691 #[inline]
693 pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
694 self.cluster.member_promote(id).await
695 }
696
697 #[inline]
699 pub async fn member_list(&mut self) -> Result<MemberListResponse> {
700 self.cluster.member_list().await
701 }
702
703 #[inline]
705 pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
706 self.maintenance.move_leader(target_id).await
707 }
708
709 #[inline]
713 pub async fn campaign(
714 &mut self,
715 name: impl Into<Vec<u8>>,
716 value: impl Into<Vec<u8>>,
717 lease: i64,
718 ) -> Result<CampaignResponse> {
719 self.election.campaign(name, value, lease).await
720 }
721
722 #[inline]
724 pub async fn proclaim(
725 &mut self,
726 value: impl Into<Vec<u8>>,
727 options: Option<ProclaimOptions>,
728 ) -> Result<ProclaimResponse> {
729 self.election.proclaim(value, options).await
730 }
731
732 #[inline]
734 pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
735 self.election.leader(name).await
736 }
737
738 #[inline]
741 pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
742 self.election.observe(name).await
743 }
744
745 #[inline]
747 pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
748 self.election.resign(option).await
749 }
750
751 async fn do_authenticate(
752 &self,
753 user: String,
754 password: String,
755 ) -> Result<MetadataValue<Ascii>> {
756 let resp = self.auth_client().authenticate(user, password).await?;
757 let token = resp.token().parse()?;
758 Ok(token)
759 }
760
761 pub async fn refresh_token(&self) -> Result<()> {
763 if let Some((user, password)) = self.options.user.as_ref() {
764 let token = self.do_authenticate(user.clone(), password.clone()).await?;
765 self.auth_token.write_unpoisoned().replace(token);
766 } else {
767 let _ = self.auth_token.write_unpoisoned().take();
768 }
769 Ok(())
770 }
771
772 pub async fn update_user(&mut self, user: Option<(String, String)>) -> Result<()> {
780 if let Some((ref name, ref password)) = user {
781 let token = self.do_authenticate(name.clone(), password.clone()).await?;
782 self.auth_token.write_unpoisoned().replace(token);
783 } else {
784 let _ = self.auth_token.write_unpoisoned().take();
785 }
786 self.options.user = user;
787 Ok(())
788 }
789}
790
791#[derive(Debug, Default, Clone)]
793pub struct ConnectOptions {
794 user: Option<(String, String)>,
796 keep_alive: Option<(Duration, Duration)>,
798 keep_alive_while_idle: bool,
800 timeout: Option<Duration>,
802 connect_timeout: Option<Duration>,
804 tcp_keepalive: Option<Duration>,
806 #[cfg(feature = "tls")]
807 tls: Option<TlsOptions>,
808 #[cfg(feature = "tls-openssl")]
809 otls: Option<OpenSslResult<OpenSslConnector>>,
810 require_leader: bool,
812}
813
814impl ConnectOptions {
815 #[inline]
817 pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
818 self.user = Some((name.into(), password.into()));
819 self
820 }
821
822 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
826 #[cfg(feature = "tls")]
827 #[inline]
828 pub fn with_tls(mut self, tls: TlsOptions) -> Self {
829 self.tls = Some(tls);
830 self
831 }
832
833 #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
835 #[cfg(feature = "tls-openssl")]
836 #[inline]
837 pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
838 self.otls = Some(otls.build());
843 self
844 }
845
846 #[inline]
848 pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
849 self.keep_alive = Some((interval, timeout));
850 self
851 }
852
853 #[inline]
855 pub fn with_timeout(mut self, timeout: Duration) -> Self {
856 self.timeout = Some(timeout);
857 self
858 }
859
860 #[inline]
862 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
863 self.connect_timeout = Some(timeout);
864 self
865 }
866
867 #[inline]
869 pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
870 self.tcp_keepalive = Some(tcp_keepalive);
871 self
872 }
873
874 #[inline]
880 pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
881 self.keep_alive_while_idle = enabled;
882 self
883 }
884
885 #[inline]
887 pub fn with_require_leader(mut self, require_leader: bool) -> Self {
888 self.require_leader = require_leader;
889 self
890 }
891
892 #[inline]
894 pub const fn new() -> Self {
895 ConnectOptions {
896 user: None,
897 keep_alive: None,
898 keep_alive_while_idle: true,
899 timeout: None,
900 connect_timeout: None,
901 tcp_keepalive: None,
902 #[cfg(feature = "tls")]
903 tls: None,
904 #[cfg(feature = "tls-openssl")]
905 otls: None,
906 require_leader: false,
907 }
908 }
909}