1#[cfg(not(feature = "tls-openssl"))]
4use crate::channel::Channel;
5use crate::error::{Error, Result};
6use crate::intercept::{InterceptedChannel, Interceptor};
7use crate::lock::RwLockExt;
8#[cfg(feature = "tls-openssl")]
9use crate::openssl_tls::{self, OpenSslClientConfig, OpenSslConnector};
10use crate::rpc::auth::Permission;
11use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
12use crate::rpc::auth::{
13 RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
14 RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
15 UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
16 UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
17};
18use crate::rpc::cluster::{
19 ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
20 MemberRemoveResponse, MemberUpdateResponse,
21};
22use crate::rpc::election::{
23 CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
24 ProclaimResponse, ResignOptions, ResignResponse,
25};
26use crate::rpc::kv::{
27 CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
28 KvClient, PutOptions, PutResponse, Txn, TxnResponse,
29};
30use crate::rpc::lease::{
31 LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
32 LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
33};
34use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
35use crate::rpc::maintenance::{
36 AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
37 HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
38};
39use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
40#[cfg(feature = "tls-openssl")]
41use crate::OpenSslResult;
42#[cfg(feature = "tls")]
43use crate::TlsOptions;
44use http::uri::Uri;
45use http::HeaderValue;
46
47use std::str::FromStr;
48use std::sync::{Arc, RwLock};
49use std::time::Duration;
50use tokio::sync::mpsc::Sender;
51
52use tonic::transport::Endpoint;
53
54use tower::discover::Change;
55
56const HTTP_PREFIX: &str = "http://";
57const HTTPS_PREFIX: &str = "https://";
58
59#[derive(Clone)]
61pub struct Client {
62 kv: KvClient,
63 watch: WatchClient,
64 lease: LeaseClient,
65 lock: LockClient,
66 auth: AuthClient,
67 maintenance: MaintenanceClient,
68 cluster: ClusterClient,
69 election: ElectionClient,
70 options: Option<ConnectOptions>,
71 tx: Sender<Change<Uri, Endpoint>>,
72}
73
74impl Client {
75 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
77 endpoints: S,
78 options: Option<ConnectOptions>,
79 ) -> Result<Self> {
80 let endpoints = {
81 let mut eps = Vec::new();
82 for e in endpoints.as_ref() {
83 let channel = Self::build_endpoint(e.as_ref(), &options)?;
84 eps.push(channel);
85 }
86 eps
87 };
88
89 if endpoints.is_empty() {
90 return Err(Error::InvalidArgs(String::from("empty endpoints")));
91 }
92
93 #[cfg(not(feature = "tls-openssl"))]
95 let (channel, tx) = Channel::balance_channel(64);
96 #[cfg(feature = "tls-openssl")]
97 let (channel, tx) = openssl_tls::balanced_channel(
98 options
99 .clone()
100 .and_then(|o| o.otls)
101 .unwrap_or_else(OpenSslConnector::create_default)?,
102 )?;
103 let channel = InterceptedChannel::new(
104 channel,
105 Interceptor {
106 require_leader: options.as_ref().map(|o| o.require_leader).unwrap_or(false),
107 },
108 );
109 for endpoint in endpoints {
110 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
112 .await
113 .unwrap();
114 }
115
116 let mut options = options;
117
118 let auth_token = Arc::new(RwLock::new(None));
119 Self::auth(channel.clone(), &mut options, &auth_token).await?;
120
121 Ok(Self::build_client(channel, tx, auth_token, options))
122 }
123
124 fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
125 #[cfg(feature = "tls-openssl")]
126 use tonic::transport::Channel;
127 let mut endpoint = if url.starts_with(HTTP_PREFIX) {
128 #[cfg(feature = "tls")]
129 if let Some(connect_options) = options {
130 if connect_options.tls.is_some() {
131 return Err(Error::InvalidArgs(String::from(
132 "TLS options are only supported with HTTPS URLs",
133 )));
134 }
135 }
136
137 Channel::builder(url.parse()?)
138 } else if url.starts_with(HTTPS_PREFIX) {
139 #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
140 return Err(Error::InvalidArgs(String::from(
141 "HTTPS URLs are only supported with the feature \"tls\"",
142 )));
143
144 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
145 {
146 Channel::builder(url.parse()?)
147 }
148
149 #[cfg(feature = "tls")]
150 {
151 let tls = if let Some(connect_options) = options {
152 connect_options.tls.clone()
153 } else {
154 None
155 }
156 .unwrap_or_else(TlsOptions::new);
157
158 Channel::builder(url.parse()?).tls_config(tls)?
159 }
160 } else {
161 #[cfg(feature = "tls")]
162 {
163 let tls = if let Some(connect_options) = options {
164 connect_options.tls.clone()
165 } else {
166 None
167 };
168
169 match tls {
170 Some(tls) => {
171 let e = HTTPS_PREFIX.to_owned() + url;
172 Channel::builder(e.parse()?).tls_config(tls)?
173 }
174 None => {
175 let e = HTTP_PREFIX.to_owned() + url;
176 Channel::builder(e.parse()?)
177 }
178 }
179 }
180
181 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
182 {
183 let pfx = if options.as_ref().and_then(|o| o.otls.as_ref()).is_some() {
184 HTTPS_PREFIX
185 } else {
186 HTTP_PREFIX
187 };
188 let e = pfx.to_owned() + url;
189 Channel::builder(e.parse()?)
190 }
191
192 #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
193 {
194 let e = HTTP_PREFIX.to_owned() + url;
195 Channel::builder(e.parse()?)
196 }
197 };
198
199 if let Some(opts) = options {
200 if let Some((interval, timeout)) = opts.keep_alive {
201 endpoint = endpoint
202 .keep_alive_while_idle(opts.keep_alive_while_idle)
203 .http2_keep_alive_interval(interval)
204 .keep_alive_timeout(timeout);
205 }
206
207 if let Some(timeout) = opts.timeout {
208 endpoint = endpoint.timeout(timeout);
209 }
210
211 if let Some(timeout) = opts.connect_timeout {
212 endpoint = endpoint.connect_timeout(timeout);
213 }
214
215 if let Some(tcp_keepalive) = opts.tcp_keepalive {
216 endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
217 }
218 }
219
220 Ok(endpoint)
221 }
222
223 async fn auth(
224 channel: InterceptedChannel,
225 options: &mut Option<ConnectOptions>,
226 auth_token: &Arc<RwLock<Option<HeaderValue>>>,
227 ) -> Result<()> {
228 let user = match options {
229 None => return Ok(()),
230 Some(opt) => {
231 opt.user.take()
233 }
234 };
235
236 if let Some((name, password)) = user {
237 let mut tmp_auth = AuthClient::new(channel, auth_token.clone());
238 let resp = tmp_auth.authenticate(name, password).await?;
239 auth_token.write_unpoisoned().replace(resp.token().parse()?);
240 }
241
242 Ok(())
243 }
244
245 fn build_client(
246 channel: InterceptedChannel,
247 tx: Sender<Change<Uri, Endpoint>>,
248 auth_token: Arc<RwLock<Option<HeaderValue>>>,
249 options: Option<ConnectOptions>,
250 ) -> Self {
251 let kv = KvClient::new(channel.clone(), auth_token.clone());
252 let watch = WatchClient::new(channel.clone(), auth_token.clone());
253 let lease = LeaseClient::new(channel.clone(), auth_token.clone());
254 let lock = LockClient::new(channel.clone(), auth_token.clone());
255 let auth = AuthClient::new(channel.clone(), auth_token.clone());
256 let cluster = ClusterClient::new(channel.clone(), auth_token.clone());
257 let maintenance = MaintenanceClient::new(channel.clone(), auth_token.clone());
258 let election = ElectionClient::new(channel, auth_token);
259
260 Self {
261 kv,
262 watch,
263 lease,
264 lock,
265 auth,
266 maintenance,
267 cluster,
268 election,
269 options,
270 tx,
271 }
272 }
273
274 #[inline]
285 pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
286 let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
287 let tx = &self.tx;
288 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
289 .await
290 .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {}", e)))
291 }
292
293 #[inline]
299 pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
300 let uri = http::Uri::from_str(endpoint.as_ref())?;
301 let tx = &self.tx;
302 tx.send(Change::Remove(uri)).await.map_err(|e| {
303 Error::EndpointError(format!("failed to remove endpoint because of {}", e))
304 })
305 }
306
307 #[inline]
309 pub fn kv_client(&self) -> KvClient {
310 self.kv.clone()
311 }
312
313 #[inline]
315 pub fn watch_client(&self) -> WatchClient {
316 self.watch.clone()
317 }
318
319 #[inline]
321 pub fn lease_client(&self) -> LeaseClient {
322 self.lease.clone()
323 }
324
325 #[inline]
327 pub fn auth_client(&self) -> AuthClient {
328 self.auth.clone()
329 }
330
331 #[inline]
333 pub fn maintenance_client(&self) -> MaintenanceClient {
334 self.maintenance.clone()
335 }
336
337 #[inline]
339 pub fn cluster_client(&self) -> ClusterClient {
340 self.cluster.clone()
341 }
342
343 #[inline]
345 pub fn lock_client(&self) -> LockClient {
346 self.lock.clone()
347 }
348
349 #[inline]
351 pub fn election_client(&self) -> ElectionClient {
352 self.election.clone()
353 }
354
355 #[inline]
359 pub async fn put(
360 &mut self,
361 key: impl Into<Vec<u8>>,
362 value: impl Into<Vec<u8>>,
363 options: Option<PutOptions>,
364 ) -> Result<PutResponse> {
365 self.kv.put(key, value, options).await
366 }
367
368 #[inline]
370 pub async fn get(
371 &mut self,
372 key: impl Into<Vec<u8>>,
373 options: Option<GetOptions>,
374 ) -> Result<GetResponse> {
375 self.kv.get(key, options).await
376 }
377
378 #[inline]
380 pub async fn delete(
381 &mut self,
382 key: impl Into<Vec<u8>>,
383 options: Option<DeleteOptions>,
384 ) -> Result<DeleteResponse> {
385 self.kv.delete(key, options).await
386 }
387
388 #[inline]
392 pub async fn compact(
393 &mut self,
394 revision: i64,
395 options: Option<CompactionOptions>,
396 ) -> Result<CompactionResponse> {
397 self.kv.compact(revision, options).await
398 }
399
400 #[inline]
405 pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
406 self.kv.txn(txn).await
407 }
408
409 #[inline]
414 pub async fn watch(
415 &mut self,
416 key: impl Into<Vec<u8>>,
417 options: Option<WatchOptions>,
418 ) -> Result<(Watcher, WatchStream)> {
419 self.watch.watch(key, options).await
420 }
421
422 #[inline]
426 pub async fn lease_grant(
427 &mut self,
428 ttl: i64,
429 options: Option<LeaseGrantOptions>,
430 ) -> Result<LeaseGrantResponse> {
431 self.lease.grant(ttl, options).await
432 }
433
434 #[inline]
436 pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
437 self.lease.revoke(id).await
438 }
439
440 #[inline]
443 pub async fn lease_keep_alive(
444 &mut self,
445 id: i64,
446 ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
447 self.lease.keep_alive(id).await
448 }
449
450 #[inline]
452 pub async fn lease_time_to_live(
453 &mut self,
454 id: i64,
455 options: Option<LeaseTimeToLiveOptions>,
456 ) -> Result<LeaseTimeToLiveResponse> {
457 self.lease.time_to_live(id, options).await
458 }
459
460 #[inline]
462 pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
463 self.lease.leases().await
464 }
465
466 #[inline]
473 pub async fn lock(
474 &mut self,
475 name: impl Into<Vec<u8>>,
476 options: Option<LockOptions>,
477 ) -> Result<LockResponse> {
478 self.lock.lock(name, options).await
479 }
480
481 #[inline]
485 pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
486 self.lock.unlock(key).await
487 }
488
489 #[inline]
491 pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
492 self.auth.auth_enable().await
493 }
494
495 #[inline]
497 pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
498 self.auth.auth_disable().await
499 }
500
501 #[inline]
503 pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
504 self.auth.role_add(name).await
505 }
506
507 #[inline]
509 pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
510 self.auth.role_delete(name).await
511 }
512
513 #[inline]
515 pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
516 self.auth.role_get(name).await
517 }
518
519 #[inline]
521 pub async fn role_list(&mut self) -> Result<RoleListResponse> {
522 self.auth.role_list().await
523 }
524
525 #[inline]
527 pub async fn role_grant_permission(
528 &mut self,
529 name: impl Into<String>,
530 perm: Permission,
531 ) -> Result<RoleGrantPermissionResponse> {
532 self.auth.role_grant_permission(name, perm).await
533 }
534
535 #[inline]
537 pub async fn role_revoke_permission(
538 &mut self,
539 name: impl Into<String>,
540 key: impl Into<Vec<u8>>,
541 options: Option<RoleRevokePermissionOptions>,
542 ) -> Result<RoleRevokePermissionResponse> {
543 self.auth.role_revoke_permission(name, key, options).await
544 }
545
546 #[inline]
548 pub async fn user_add(
549 &mut self,
550 name: impl Into<String>,
551 password: impl Into<String>,
552 options: Option<UserAddOptions>,
553 ) -> Result<UserAddResponse> {
554 self.auth.user_add(name, password, options).await
555 }
556
557 #[inline]
559 pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
560 self.auth.user_get(name).await
561 }
562
563 #[inline]
565 pub async fn user_list(&mut self) -> Result<UserListResponse> {
566 self.auth.user_list().await
567 }
568
569 #[inline]
571 pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
572 self.auth.user_delete(name).await
573 }
574
575 #[inline]
577 pub async fn user_change_password(
578 &mut self,
579 name: impl Into<String>,
580 password: impl Into<String>,
581 ) -> Result<UserChangePasswordResponse> {
582 self.auth.user_change_password(name, password).await
583 }
584
585 #[inline]
587 pub async fn user_grant_role(
588 &mut self,
589 user: impl Into<String>,
590 role: impl Into<String>,
591 ) -> Result<UserGrantRoleResponse> {
592 self.auth.user_grant_role(user, role).await
593 }
594
595 #[inline]
597 pub async fn user_revoke_role(
598 &mut self,
599 user: impl Into<String>,
600 role: impl Into<String>,
601 ) -> Result<UserRevokeRoleResponse> {
602 self.auth.user_revoke_role(user, role).await
603 }
604
605 #[inline]
607 pub async fn alarm(
608 &mut self,
609 alarm_action: AlarmAction,
610 alarm_type: AlarmType,
611 options: Option<AlarmOptions>,
612 ) -> Result<AlarmResponse> {
613 self.maintenance
614 .alarm(alarm_action, alarm_type, options)
615 .await
616 }
617
618 #[inline]
620 pub async fn status(&mut self) -> Result<StatusResponse> {
621 self.maintenance.status().await
622 }
623
624 #[inline]
626 pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
627 self.maintenance.defragment().await
628 }
629
630 #[inline]
634 pub async fn hash(&mut self) -> Result<HashResponse> {
635 self.maintenance.hash().await
636 }
637
638 #[inline]
641 pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
642 self.maintenance.hash_kv(revision).await
643 }
644
645 #[inline]
647 pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
648 self.maintenance.snapshot().await
649 }
650
651 #[inline]
653 pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
654 &mut self,
655 urls: S,
656 options: Option<MemberAddOptions>,
657 ) -> Result<MemberAddResponse> {
658 let mut eps = Vec::new();
659 for e in urls.as_ref() {
660 let e = e.as_ref();
661 let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
662 e.to_string()
663 } else {
664 HTTP_PREFIX.to_owned() + e
665 };
666 eps.push(url);
667 }
668
669 self.cluster.member_add(eps, options).await
670 }
671
672 #[inline]
674 pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
675 self.cluster.member_remove(id).await
676 }
677
678 #[inline]
680 pub async fn member_update(
681 &mut self,
682 id: u64,
683 url: impl Into<Vec<String>>,
684 ) -> Result<MemberUpdateResponse> {
685 self.cluster.member_update(id, url).await
686 }
687
688 #[inline]
690 pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
691 self.cluster.member_promote(id).await
692 }
693
694 #[inline]
696 pub async fn member_list(&mut self) -> Result<MemberListResponse> {
697 self.cluster.member_list().await
698 }
699
700 #[inline]
702 pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
703 self.maintenance.move_leader(target_id).await
704 }
705
706 #[inline]
710 pub async fn campaign(
711 &mut self,
712 name: impl Into<Vec<u8>>,
713 value: impl Into<Vec<u8>>,
714 lease: i64,
715 ) -> Result<CampaignResponse> {
716 self.election.campaign(name, value, lease).await
717 }
718
719 #[inline]
721 pub async fn proclaim(
722 &mut self,
723 value: impl Into<Vec<u8>>,
724 options: Option<ProclaimOptions>,
725 ) -> Result<ProclaimResponse> {
726 self.election.proclaim(value, options).await
727 }
728
729 #[inline]
731 pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
732 self.election.leader(name).await
733 }
734
735 #[inline]
738 pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
739 self.election.observe(name).await
740 }
741
742 #[inline]
744 pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
745 self.election.resign(option).await
746 }
747
748 pub async fn set_client_auth(&mut self, name: String, password: String) -> Result<()> {
750 self.auth.set_client_auth(name, password).await
751 }
752
753 pub fn remove_client_auth(&mut self) {
755 self.auth.remove_client_auth();
756 }
757}
758
759#[derive(Debug, Default, Clone)]
761pub struct ConnectOptions {
762 user: Option<(String, String)>,
764 keep_alive: Option<(Duration, Duration)>,
766 keep_alive_while_idle: bool,
768 timeout: Option<Duration>,
770 connect_timeout: Option<Duration>,
772 tcp_keepalive: Option<Duration>,
774 #[cfg(feature = "tls")]
775 tls: Option<TlsOptions>,
776 #[cfg(feature = "tls-openssl")]
777 otls: Option<OpenSslResult<OpenSslConnector>>,
778 require_leader: bool,
780}
781
782impl ConnectOptions {
783 #[inline]
785 pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
786 self.user = Some((name.into(), password.into()));
787 self
788 }
789
790 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
794 #[cfg(feature = "tls")]
795 #[inline]
796 pub fn with_tls(mut self, tls: TlsOptions) -> Self {
797 self.tls = Some(tls);
798 self
799 }
800
801 #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
803 #[cfg(feature = "tls-openssl")]
804 #[inline]
805 pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
806 self.otls = Some(otls.build());
811 self
812 }
813
814 #[inline]
816 pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
817 self.keep_alive = Some((interval, timeout));
818 self
819 }
820
821 #[inline]
823 pub fn with_timeout(mut self, timeout: Duration) -> Self {
824 self.timeout = Some(timeout);
825 self
826 }
827
828 #[inline]
830 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
831 self.connect_timeout = Some(timeout);
832 self
833 }
834
835 #[inline]
837 pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
838 self.tcp_keepalive = Some(tcp_keepalive);
839 self
840 }
841
842 #[inline]
848 pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
849 self.keep_alive_while_idle = enabled;
850 self
851 }
852
853 #[inline]
855 pub fn with_require_leader(mut self, require_leader: bool) -> Self {
856 self.require_leader = require_leader;
857 self
858 }
859
860 #[inline]
862 pub const fn new() -> Self {
863 ConnectOptions {
864 user: None,
865 keep_alive: None,
866 keep_alive_while_idle: true,
867 timeout: None,
868 connect_timeout: None,
869 tcp_keepalive: None,
870 #[cfg(feature = "tls")]
871 tls: None,
872 #[cfg(feature = "tls-openssl")]
873 otls: None,
874 require_leader: false,
875 }
876 }
877}