1#[cfg(feature = "raw-channel")]
4use crate::channel::Channel;
5use crate::error::{Error, Result};
6use crate::intercept::{InterceptedChannel, Interceptor};
7use crate::lock::RwLockExt;
8#[cfg(feature = "tls-openssl")]
9use crate::openssl_tls::{OpenSslClientConfig, OpenSslConnector};
10use crate::rpc::auth::Permission;
11use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
12use crate::rpc::auth::{
13 RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
14 RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
15 UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
16 UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
17};
18use crate::rpc::cluster::{
19 ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
20 MemberRemoveResponse, MemberUpdateResponse,
21};
22use crate::rpc::election::{
23 CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
24 ProclaimResponse, ResignOptions, ResignResponse,
25};
26use crate::rpc::kv::{
27 CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
28 KvClient, PutOptions, PutResponse, Txn, TxnResponse,
29};
30use crate::rpc::lease::{
31 LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
32 LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
33};
34use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
35use crate::rpc::maintenance::{
36 AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
37 HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
38};
39use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
40#[cfg(feature = "tls-openssl")]
41use crate::OpenSslResult;
42#[cfg(feature = "tls")]
43use crate::TlsOptions;
44use http::uri::Uri;
45use http::HeaderValue;
46
47use std::str::FromStr;
48use std::sync::{Arc, RwLock};
49use std::time::Duration;
50use tokio::sync::mpsc::Sender;
51
52use tonic::transport::{channel::Change, Endpoint};
53
54const HTTP_PREFIX: &str = "http://";
55const HTTPS_PREFIX: &str = "https://";
56
57#[derive(Clone)]
59pub struct Client {
60 kv: KvClient,
61 watch: WatchClient,
62 lease: LeaseClient,
63 lock: LockClient,
64 auth: AuthClient,
65 maintenance: MaintenanceClient,
66 cluster: ClusterClient,
67 election: ElectionClient,
68 options: Option<ConnectOptions>,
69 tx: Option<Sender<Change<Uri, Endpoint>>>,
70}
71
72impl Client {
73 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
75 endpoints: S,
76 options: Option<ConnectOptions>,
77 ) -> Result<Self> {
78 #[cfg(not(feature = "tls-openssl"))]
79 let make_balanced_channel = crate::channel::Tonic;
80 #[cfg(feature = "tls-openssl")]
81 let make_balanced_channel = crate::channel::Openssl {
82 conn: options
83 .clone()
84 .and_then(|o| o.otls)
85 .unwrap_or_else(OpenSslConnector::create_default)?,
86 };
87 Self::connect_with_balanced_channel(endpoints, options, make_balanced_channel).await
88 }
89
90 pub async fn connect_with_balanced_channel<E: AsRef<str>, S: AsRef<[E]>, MBC>(
92 endpoints: S,
93 options: Option<ConnectOptions>,
94 make_balanced_channel: MBC,
95 ) -> Result<Self>
96 where
97 MBC: crate::channel::BalancedChannelBuilder,
98 crate::error::Error: From<MBC::Error>,
99 {
100 let endpoints = {
101 let mut eps = Vec::new();
102 for e in endpoints.as_ref() {
103 let channel = Self::build_endpoint(e.as_ref(), &options)?;
104 eps.push(channel);
105 }
106 eps
107 };
108
109 if endpoints.is_empty() {
110 return Err(Error::InvalidArgs(String::from("empty endpoints")));
111 }
112
113 let (channel, tx) = make_balanced_channel.balanced_channel(64)?;
115 let channel = InterceptedChannel::new(
116 channel,
117 Interceptor {
118 require_leader: options.as_ref().map(|o| o.require_leader).unwrap_or(false),
119 },
120 );
121 for endpoint in endpoints {
122 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
124 .await
125 .unwrap();
126 }
127
128 let mut options = options;
129
130 let auth_token = Arc::new(RwLock::new(None));
131 Self::auth(channel.clone(), &mut options, &auth_token).await?;
132
133 Ok(Self::build_client(channel, Some(tx), auth_token, options))
134 }
135
136 #[cfg(feature = "raw-channel")]
137 pub async fn from_channel(channel: Channel, options: Option<ConnectOptions>) -> Result<Self> {
139 let channel = InterceptedChannel::new(
140 channel,
141 Interceptor {
142 require_leader: options.as_ref().map(|o| o.require_leader).unwrap_or(false),
143 },
144 );
145 let mut options = options;
146
147 let auth_token = Arc::new(RwLock::new(None));
148 Self::auth(channel.clone(), &mut options, &auth_token).await?;
149
150 Ok(Self::build_client(channel, None, auth_token, options))
151 }
152
153 fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
154 use tonic::transport::Channel as TonicChannel;
155 let mut endpoint = if url.starts_with(HTTP_PREFIX) {
156 #[cfg(feature = "tls")]
157 if let Some(connect_options) = options {
158 if connect_options.tls.is_some() {
159 return Err(Error::InvalidArgs(String::from(
160 "TLS options are only supported with HTTPS URLs",
161 )));
162 }
163 }
164
165 TonicChannel::builder(url.parse()?)
166 } else if url.starts_with(HTTPS_PREFIX) {
167 #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
168 return Err(Error::InvalidArgs(String::from(
169 "HTTPS URLs are only supported with the feature \"tls\"",
170 )));
171
172 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
173 {
174 TonicChannel::builder(url.parse()?)
175 }
176
177 #[cfg(feature = "tls")]
178 {
179 let tls = if let Some(connect_options) = options {
180 connect_options.tls.clone()
181 } else {
182 None
183 }
184 .unwrap_or_else(TlsOptions::new);
185
186 TonicChannel::builder(url.parse()?).tls_config(tls)?
187 }
188 } else {
189 #[cfg(feature = "tls")]
190 {
191 let tls = if let Some(connect_options) = options {
192 connect_options.tls.clone()
193 } else {
194 None
195 };
196
197 match tls {
198 Some(tls) => {
199 let e = HTTPS_PREFIX.to_owned() + url;
200 TonicChannel::builder(e.parse()?).tls_config(tls)?
201 }
202 None => {
203 let e = HTTP_PREFIX.to_owned() + url;
204 TonicChannel::builder(e.parse()?)
205 }
206 }
207 }
208
209 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
210 {
211 let pfx = if options.as_ref().and_then(|o| o.otls.as_ref()).is_some() {
212 HTTPS_PREFIX
213 } else {
214 HTTP_PREFIX
215 };
216 let e = pfx.to_owned() + url;
217 TonicChannel::builder(e.parse()?)
218 }
219
220 #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
221 {
222 let e = HTTP_PREFIX.to_owned() + url;
223 TonicChannel::builder(e.parse()?)
224 }
225 };
226
227 if let Some(opts) = options {
228 if let Some((interval, timeout)) = opts.keep_alive {
229 endpoint = endpoint
230 .keep_alive_while_idle(opts.keep_alive_while_idle)
231 .http2_keep_alive_interval(interval)
232 .keep_alive_timeout(timeout);
233 }
234
235 if let Some(timeout) = opts.timeout {
236 endpoint = endpoint.timeout(timeout);
237 }
238
239 if let Some(timeout) = opts.connect_timeout {
240 endpoint = endpoint.connect_timeout(timeout);
241 }
242
243 if let Some(tcp_keepalive) = opts.tcp_keepalive {
244 endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
245 }
246 }
247
248 Ok(endpoint)
249 }
250
251 async fn auth(
252 channel: InterceptedChannel,
253 options: &mut Option<ConnectOptions>,
254 auth_token: &Arc<RwLock<Option<HeaderValue>>>,
255 ) -> Result<()> {
256 let user = match options {
257 None => return Ok(()),
258 Some(opt) => {
259 opt.user.take()
261 }
262 };
263
264 if let Some((name, password)) = user {
265 let mut tmp_auth = AuthClient::new(channel, auth_token.clone());
266 let resp = tmp_auth.authenticate(name, password).await?;
267 auth_token.write_unpoisoned().replace(resp.token().parse()?);
268 }
269
270 Ok(())
271 }
272
273 fn build_client(
274 channel: InterceptedChannel,
275 tx: Option<Sender<Change<Uri, Endpoint>>>,
276 auth_token: Arc<RwLock<Option<HeaderValue>>>,
277 options: Option<ConnectOptions>,
278 ) -> Self {
279 let kv = KvClient::new(channel.clone(), auth_token.clone());
280 let watch = WatchClient::new(channel.clone(), auth_token.clone());
281 let lease = LeaseClient::new(channel.clone(), auth_token.clone());
282 let lock = LockClient::new(channel.clone(), auth_token.clone());
283 let auth = AuthClient::new(channel.clone(), auth_token.clone());
284 let cluster = ClusterClient::new(channel.clone(), auth_token.clone());
285 let maintenance = MaintenanceClient::new(channel.clone(), auth_token.clone());
286 let election = ElectionClient::new(channel, auth_token);
287
288 Self {
289 kv,
290 watch,
291 lease,
292 lock,
293 auth,
294 maintenance,
295 cluster,
296 election,
297 options,
298 tx,
299 }
300 }
301
302 #[inline]
313 pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
314 let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
315 let Some(tx) = &self.tx else {
316 return Err(Error::EndpointsNotManaged);
317 };
318 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
319 .await
320 .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {e}")))
321 }
322
323 #[inline]
329 pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
330 let uri = http::Uri::from_str(endpoint.as_ref())?;
331 let Some(tx) = &self.tx else {
332 return Err(Error::EndpointsNotManaged);
333 };
334 tx.send(Change::Remove(uri))
335 .await
336 .map_err(|e| Error::EndpointError(format!("failed to remove endpoint because of {e}")))
337 }
338
339 #[inline]
341 pub fn kv_client(&self) -> KvClient {
342 self.kv.clone()
343 }
344
345 #[inline]
347 pub fn watch_client(&self) -> WatchClient {
348 self.watch.clone()
349 }
350
351 #[inline]
353 pub fn lease_client(&self) -> LeaseClient {
354 self.lease.clone()
355 }
356
357 #[inline]
359 pub fn auth_client(&self) -> AuthClient {
360 self.auth.clone()
361 }
362
363 #[inline]
365 pub fn maintenance_client(&self) -> MaintenanceClient {
366 self.maintenance.clone()
367 }
368
369 #[inline]
371 pub fn cluster_client(&self) -> ClusterClient {
372 self.cluster.clone()
373 }
374
375 #[inline]
377 pub fn lock_client(&self) -> LockClient {
378 self.lock.clone()
379 }
380
381 #[inline]
383 pub fn election_client(&self) -> ElectionClient {
384 self.election.clone()
385 }
386
387 #[inline]
391 pub async fn put(
392 &mut self,
393 key: impl Into<Vec<u8>>,
394 value: impl Into<Vec<u8>>,
395 options: Option<PutOptions>,
396 ) -> Result<PutResponse> {
397 self.kv.put(key, value, options).await
398 }
399
400 #[inline]
402 pub async fn get(
403 &mut self,
404 key: impl Into<Vec<u8>>,
405 options: Option<GetOptions>,
406 ) -> Result<GetResponse> {
407 self.kv.get(key, options).await
408 }
409
410 #[inline]
412 pub async fn delete(
413 &mut self,
414 key: impl Into<Vec<u8>>,
415 options: Option<DeleteOptions>,
416 ) -> Result<DeleteResponse> {
417 self.kv.delete(key, options).await
418 }
419
420 #[inline]
424 pub async fn compact(
425 &mut self,
426 revision: i64,
427 options: Option<CompactionOptions>,
428 ) -> Result<CompactionResponse> {
429 self.kv.compact(revision, options).await
430 }
431
432 #[inline]
437 pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
438 self.kv.txn(txn).await
439 }
440
441 #[inline]
446 pub async fn watch(
447 &mut self,
448 key: impl Into<Vec<u8>>,
449 options: Option<WatchOptions>,
450 ) -> Result<(Watcher, WatchStream)> {
451 self.watch.watch(key, options).await
452 }
453
454 #[inline]
458 pub async fn lease_grant(
459 &mut self,
460 ttl: i64,
461 options: Option<LeaseGrantOptions>,
462 ) -> Result<LeaseGrantResponse> {
463 self.lease.grant(ttl, options).await
464 }
465
466 #[inline]
468 pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
469 self.lease.revoke(id).await
470 }
471
472 #[inline]
475 pub async fn lease_keep_alive(
476 &mut self,
477 id: i64,
478 ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
479 self.lease.keep_alive(id).await
480 }
481
482 #[inline]
484 pub async fn lease_time_to_live(
485 &mut self,
486 id: i64,
487 options: Option<LeaseTimeToLiveOptions>,
488 ) -> Result<LeaseTimeToLiveResponse> {
489 self.lease.time_to_live(id, options).await
490 }
491
492 #[inline]
494 pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
495 self.lease.leases().await
496 }
497
498 #[inline]
505 pub async fn lock(
506 &mut self,
507 name: impl Into<Vec<u8>>,
508 options: Option<LockOptions>,
509 ) -> Result<LockResponse> {
510 self.lock.lock(name, options).await
511 }
512
513 #[inline]
517 pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
518 self.lock.unlock(key).await
519 }
520
521 #[inline]
523 pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
524 self.auth.auth_enable().await
525 }
526
527 #[inline]
529 pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
530 self.auth.auth_disable().await
531 }
532
533 #[inline]
535 pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
536 self.auth.role_add(name).await
537 }
538
539 #[inline]
541 pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
542 self.auth.role_delete(name).await
543 }
544
545 #[inline]
547 pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
548 self.auth.role_get(name).await
549 }
550
551 #[inline]
553 pub async fn role_list(&mut self) -> Result<RoleListResponse> {
554 self.auth.role_list().await
555 }
556
557 #[inline]
559 pub async fn role_grant_permission(
560 &mut self,
561 name: impl Into<String>,
562 perm: Permission,
563 ) -> Result<RoleGrantPermissionResponse> {
564 self.auth.role_grant_permission(name, perm).await
565 }
566
567 #[inline]
569 pub async fn role_revoke_permission(
570 &mut self,
571 name: impl Into<String>,
572 key: impl Into<Vec<u8>>,
573 options: Option<RoleRevokePermissionOptions>,
574 ) -> Result<RoleRevokePermissionResponse> {
575 self.auth.role_revoke_permission(name, key, options).await
576 }
577
578 #[inline]
580 pub async fn user_add(
581 &mut self,
582 name: impl Into<String>,
583 password: impl Into<String>,
584 options: Option<UserAddOptions>,
585 ) -> Result<UserAddResponse> {
586 self.auth.user_add(name, password, options).await
587 }
588
589 #[inline]
591 pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
592 self.auth.user_get(name).await
593 }
594
595 #[inline]
597 pub async fn user_list(&mut self) -> Result<UserListResponse> {
598 self.auth.user_list().await
599 }
600
601 #[inline]
603 pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
604 self.auth.user_delete(name).await
605 }
606
607 #[inline]
609 pub async fn user_change_password(
610 &mut self,
611 name: impl Into<String>,
612 password: impl Into<String>,
613 ) -> Result<UserChangePasswordResponse> {
614 self.auth.user_change_password(name, password).await
615 }
616
617 #[inline]
619 pub async fn user_grant_role(
620 &mut self,
621 user: impl Into<String>,
622 role: impl Into<String>,
623 ) -> Result<UserGrantRoleResponse> {
624 self.auth.user_grant_role(user, role).await
625 }
626
627 #[inline]
629 pub async fn user_revoke_role(
630 &mut self,
631 user: impl Into<String>,
632 role: impl Into<String>,
633 ) -> Result<UserRevokeRoleResponse> {
634 self.auth.user_revoke_role(user, role).await
635 }
636
637 #[inline]
639 pub async fn alarm(
640 &mut self,
641 alarm_action: AlarmAction,
642 alarm_type: AlarmType,
643 options: Option<AlarmOptions>,
644 ) -> Result<AlarmResponse> {
645 self.maintenance
646 .alarm(alarm_action, alarm_type, options)
647 .await
648 }
649
650 #[inline]
652 pub async fn status(&mut self) -> Result<StatusResponse> {
653 self.maintenance.status().await
654 }
655
656 #[inline]
658 pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
659 self.maintenance.defragment().await
660 }
661
662 #[inline]
666 pub async fn hash(&mut self) -> Result<HashResponse> {
667 self.maintenance.hash().await
668 }
669
670 #[inline]
673 pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
674 self.maintenance.hash_kv(revision).await
675 }
676
677 #[inline]
679 pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
680 self.maintenance.snapshot().await
681 }
682
683 #[inline]
685 pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
686 &mut self,
687 urls: S,
688 options: Option<MemberAddOptions>,
689 ) -> Result<MemberAddResponse> {
690 let mut eps = Vec::new();
691 for e in urls.as_ref() {
692 let e = e.as_ref();
693 let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
694 e.to_string()
695 } else {
696 HTTP_PREFIX.to_owned() + e
697 };
698 eps.push(url);
699 }
700
701 self.cluster.member_add(eps, options).await
702 }
703
704 #[inline]
706 pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
707 self.cluster.member_remove(id).await
708 }
709
710 #[inline]
712 pub async fn member_update(
713 &mut self,
714 id: u64,
715 url: impl Into<Vec<String>>,
716 ) -> Result<MemberUpdateResponse> {
717 self.cluster.member_update(id, url).await
718 }
719
720 #[inline]
722 pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
723 self.cluster.member_promote(id).await
724 }
725
726 #[inline]
728 pub async fn member_list(&mut self) -> Result<MemberListResponse> {
729 self.cluster.member_list().await
730 }
731
732 #[inline]
734 pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
735 self.maintenance.move_leader(target_id).await
736 }
737
738 #[inline]
742 pub async fn campaign(
743 &mut self,
744 name: impl Into<Vec<u8>>,
745 value: impl Into<Vec<u8>>,
746 lease: i64,
747 ) -> Result<CampaignResponse> {
748 self.election.campaign(name, value, lease).await
749 }
750
751 #[inline]
753 pub async fn proclaim(
754 &mut self,
755 value: impl Into<Vec<u8>>,
756 options: Option<ProclaimOptions>,
757 ) -> Result<ProclaimResponse> {
758 self.election.proclaim(value, options).await
759 }
760
761 #[inline]
763 pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
764 self.election.leader(name).await
765 }
766
767 #[inline]
770 pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
771 self.election.observe(name).await
772 }
773
774 #[inline]
776 pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
777 self.election.resign(option).await
778 }
779
780 pub async fn set_client_auth(&mut self, name: String, password: String) -> Result<()> {
782 self.auth.set_client_auth(name, password).await
783 }
784
785 pub fn remove_client_auth(&mut self) {
787 self.auth.remove_client_auth();
788 }
789}
790
791#[derive(Debug, Default, Clone)]
793pub struct ConnectOptions {
794 user: Option<(String, String)>,
796 keep_alive: Option<(Duration, Duration)>,
798 keep_alive_while_idle: bool,
800 timeout: Option<Duration>,
802 connect_timeout: Option<Duration>,
804 tcp_keepalive: Option<Duration>,
806 #[cfg(feature = "tls")]
807 tls: Option<TlsOptions>,
808 #[cfg(feature = "tls-openssl")]
809 otls: Option<OpenSslResult<OpenSslConnector>>,
810 require_leader: bool,
812}
813
814impl ConnectOptions {
815 #[inline]
817 pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
818 self.user = Some((name.into(), password.into()));
819 self
820 }
821
822 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
826 #[cfg(feature = "tls")]
827 #[inline]
828 pub fn with_tls(mut self, tls: TlsOptions) -> Self {
829 self.tls = Some(tls);
830 self
831 }
832
833 #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
835 #[cfg(feature = "tls-openssl")]
836 #[inline]
837 pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
838 self.otls = Some(otls.build());
843 self
844 }
845
846 #[inline]
848 pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
849 self.keep_alive = Some((interval, timeout));
850 self
851 }
852
853 #[inline]
855 pub fn with_timeout(mut self, timeout: Duration) -> Self {
856 self.timeout = Some(timeout);
857 self
858 }
859
860 #[inline]
862 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
863 self.connect_timeout = Some(timeout);
864 self
865 }
866
867 #[inline]
869 pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
870 self.tcp_keepalive = Some(tcp_keepalive);
871 self
872 }
873
874 #[inline]
880 pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
881 self.keep_alive_while_idle = enabled;
882 self
883 }
884
885 #[inline]
887 pub fn with_require_leader(mut self, require_leader: bool) -> Self {
888 self.require_leader = require_leader;
889 self
890 }
891
892 #[inline]
894 pub const fn new() -> Self {
895 ConnectOptions {
896 user: None,
897 keep_alive: None,
898 keep_alive_while_idle: true,
899 timeout: None,
900 connect_timeout: None,
901 tcp_keepalive: None,
902 #[cfg(feature = "tls")]
903 tls: None,
904 #[cfg(feature = "tls-openssl")]
905 otls: None,
906 require_leader: false,
907 }
908 }
909}