1use crate::channel::Channel;
4use crate::error::{Error, Result};
5#[cfg(feature = "tls-openssl")]
6use crate::openssl_tls::{self, OpenSslClientConfig, OpenSslConnector};
7use crate::rpc::auth::Permission;
8use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
9use crate::rpc::auth::{
10 RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
11 RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
12 UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
13 UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
14};
15use crate::rpc::cluster::{
16 ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
17 MemberRemoveResponse, MemberUpdateResponse,
18};
19use crate::rpc::election::{
20 CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
21 ProclaimResponse, ResignOptions, ResignResponse,
22};
23use crate::rpc::kv::{
24 CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
25 KvClient, PutOptions, PutResponse, Txn, TxnResponse,
26};
27use crate::rpc::lease::{
28 LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
29 LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
30};
31use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
32use crate::rpc::maintenance::{
33 AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
34 HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
35};
36use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
37#[cfg(feature = "tls-openssl")]
38use crate::OpenSslResult;
39#[cfg(feature = "tls")]
40use crate::TlsOptions;
41use http::uri::Uri;
42use http::HeaderValue;
43
44use std::str::FromStr;
45use std::sync::{Arc, RwLock};
46use std::time::Duration;
47use tokio::sync::mpsc::Sender;
48
49use tonic::transport::Endpoint;
50
51use tower::discover::Change;
52
53const HTTP_PREFIX: &str = "http://";
54const HTTPS_PREFIX: &str = "https://";
55
56#[derive(Clone)]
58pub struct Client {
59 kv: KvClient,
60 watch: WatchClient,
61 lease: LeaseClient,
62 lock: LockClient,
63 auth: AuthClient,
64 maintenance: MaintenanceClient,
65 cluster: ClusterClient,
66 election: ElectionClient,
67 options: Option<ConnectOptions>,
68 tx: Sender<Change<Uri, Endpoint>>,
69}
70
71impl Client {
72 pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
74 endpoints: S,
75 options: Option<ConnectOptions>,
76 ) -> Result<Self> {
77 let endpoints = {
78 let mut eps = Vec::new();
79 for e in endpoints.as_ref() {
80 let channel = Self::build_endpoint(e.as_ref(), &options)?;
81 eps.push(channel);
82 }
83 eps
84 };
85
86 if endpoints.is_empty() {
87 return Err(Error::InvalidArgs(String::from("empty endpoints")));
88 }
89
90 #[cfg(not(feature = "tls-openssl"))]
92 let (channel, tx) = Channel::balance_channel(64);
93 #[cfg(feature = "tls-openssl")]
94 let (channel, tx) = openssl_tls::balanced_channel(
95 options
96 .clone()
97 .and_then(|o| o.otls)
98 .unwrap_or_else(OpenSslConnector::create_default)?,
99 )?;
100 for endpoint in endpoints {
101 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
103 .await
104 .unwrap();
105 }
106
107 let mut options = options;
108
109 let auth_token = Arc::new(RwLock::new(None));
110 Self::auth(channel.clone(), &mut options, &auth_token).await?;
111
112 Ok(Self::build_client(channel, tx, auth_token, options))
113 }
114
115 fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
116 #[cfg(feature = "tls-openssl")]
117 use tonic::transport::Channel;
118 let mut endpoint = if url.starts_with(HTTP_PREFIX) {
119 #[cfg(feature = "tls")]
120 if let Some(connect_options) = options {
121 if connect_options.tls.is_some() {
122 return Err(Error::InvalidArgs(String::from(
123 "TLS options are only supported with HTTPS URLs",
124 )));
125 }
126 }
127
128 Channel::builder(url.parse()?)
129 } else if url.starts_with(HTTPS_PREFIX) {
130 #[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
131 return Err(Error::InvalidArgs(String::from(
132 "HTTPS URLs are only supported with the feature \"tls\"",
133 )));
134
135 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
136 {
137 Channel::builder(url.parse()?)
138 }
139
140 #[cfg(feature = "tls")]
141 {
142 let tls = if let Some(connect_options) = options {
143 connect_options.tls.clone()
144 } else {
145 None
146 }
147 .unwrap_or_else(TlsOptions::new);
148
149 Channel::builder(url.parse()?).tls_config(tls)?
150 }
151 } else {
152 #[cfg(feature = "tls")]
153 {
154 let tls = if let Some(connect_options) = options {
155 connect_options.tls.clone()
156 } else {
157 None
158 };
159
160 match tls {
161 Some(tls) => {
162 let e = HTTPS_PREFIX.to_owned() + url;
163 Channel::builder(e.parse()?).tls_config(tls)?
164 }
165 None => {
166 let e = HTTP_PREFIX.to_owned() + url;
167 Channel::builder(e.parse()?)
168 }
169 }
170 }
171
172 #[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
173 {
174 let pfx = if options.as_ref().and_then(|o| o.otls.as_ref()).is_some() {
175 HTTPS_PREFIX
176 } else {
177 HTTP_PREFIX
178 };
179 let e = pfx.to_owned() + url;
180 Channel::builder(e.parse()?)
181 }
182
183 #[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
184 {
185 let e = HTTP_PREFIX.to_owned() + url;
186 Channel::builder(e.parse()?)
187 }
188 };
189
190 if let Some(opts) = options {
191 if let Some((interval, timeout)) = opts.keep_alive {
192 endpoint = endpoint
193 .keep_alive_while_idle(opts.keep_alive_while_idle)
194 .http2_keep_alive_interval(interval)
195 .keep_alive_timeout(timeout);
196 }
197
198 if let Some(timeout) = opts.timeout {
199 endpoint = endpoint.timeout(timeout);
200 }
201
202 if let Some(timeout) = opts.connect_timeout {
203 endpoint = endpoint.connect_timeout(timeout);
204 }
205
206 if let Some(tcp_keepalive) = opts.tcp_keepalive {
207 endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
208 }
209 }
210
211 Ok(endpoint)
212 }
213
214 async fn auth(
215 channel: Channel,
216 options: &mut Option<ConnectOptions>,
217 auth_token: &Arc<RwLock<Option<HeaderValue>>>,
218 ) -> Result<()> {
219 let user = match options {
220 None => return Ok(()),
221 Some(opt) => {
222 opt.user.take()
224 }
225 };
226
227 if let Some((name, password)) = user {
228 let mut tmp_auth = AuthClient::new(channel, auth_token.clone());
229 let resp = tmp_auth.authenticate(name, password).await?;
230 auth_token.write().unwrap().replace(resp.token().parse()?);
231 }
232
233 Ok(())
234 }
235
236 fn build_client(
237 channel: Channel,
238 tx: Sender<Change<Uri, Endpoint>>,
239 auth_token: Arc<RwLock<Option<HeaderValue>>>,
240 options: Option<ConnectOptions>,
241 ) -> Self {
242 let kv = KvClient::new(channel.clone(), auth_token.clone());
243 let watch = WatchClient::new(channel.clone(), auth_token.clone());
244 let lease = LeaseClient::new(channel.clone(), auth_token.clone());
245 let lock = LockClient::new(channel.clone(), auth_token.clone());
246 let auth = AuthClient::new(channel.clone(), auth_token.clone());
247 let cluster = ClusterClient::new(channel.clone(), auth_token.clone());
248 let maintenance = MaintenanceClient::new(channel.clone(), auth_token.clone());
249 let election = ElectionClient::new(channel, auth_token);
250
251 Self {
252 kv,
253 watch,
254 lease,
255 lock,
256 auth,
257 maintenance,
258 cluster,
259 election,
260 options,
261 tx,
262 }
263 }
264
265 #[inline]
276 pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
277 let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
278 let tx = &self.tx;
279 tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
280 .await
281 .map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {}", e)))
282 }
283
284 #[inline]
290 pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
291 let uri = http::Uri::from_str(endpoint.as_ref())?;
292 let tx = &self.tx;
293 tx.send(Change::Remove(uri)).await.map_err(|e| {
294 Error::EndpointError(format!("failed to remove endpoint because of {}", e))
295 })
296 }
297
298 #[inline]
300 pub fn kv_client(&self) -> KvClient {
301 self.kv.clone()
302 }
303
304 #[inline]
306 pub fn watch_client(&self) -> WatchClient {
307 self.watch.clone()
308 }
309
310 #[inline]
312 pub fn lease_client(&self) -> LeaseClient {
313 self.lease.clone()
314 }
315
316 #[inline]
318 pub fn auth_client(&self) -> AuthClient {
319 self.auth.clone()
320 }
321
322 #[inline]
324 pub fn maintenance_client(&self) -> MaintenanceClient {
325 self.maintenance.clone()
326 }
327
328 #[inline]
330 pub fn cluster_client(&self) -> ClusterClient {
331 self.cluster.clone()
332 }
333
334 #[inline]
336 pub fn lock_client(&self) -> LockClient {
337 self.lock.clone()
338 }
339
340 #[inline]
342 pub fn election_client(&self) -> ElectionClient {
343 self.election.clone()
344 }
345
346 #[inline]
350 pub async fn put(
351 &mut self,
352 key: impl Into<Vec<u8>>,
353 value: impl Into<Vec<u8>>,
354 options: Option<PutOptions>,
355 ) -> Result<PutResponse> {
356 self.kv.put(key, value, options).await
357 }
358
359 #[inline]
361 pub async fn get(
362 &mut self,
363 key: impl Into<Vec<u8>>,
364 options: Option<GetOptions>,
365 ) -> Result<GetResponse> {
366 self.kv.get(key, options).await
367 }
368
369 #[inline]
371 pub async fn delete(
372 &mut self,
373 key: impl Into<Vec<u8>>,
374 options: Option<DeleteOptions>,
375 ) -> Result<DeleteResponse> {
376 self.kv.delete(key, options).await
377 }
378
379 #[inline]
383 pub async fn compact(
384 &mut self,
385 revision: i64,
386 options: Option<CompactionOptions>,
387 ) -> Result<CompactionResponse> {
388 self.kv.compact(revision, options).await
389 }
390
391 #[inline]
396 pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
397 self.kv.txn(txn).await
398 }
399
400 #[inline]
405 pub async fn watch(
406 &mut self,
407 key: impl Into<Vec<u8>>,
408 options: Option<WatchOptions>,
409 ) -> Result<(Watcher, WatchStream)> {
410 self.watch.watch(key, options).await
411 }
412
413 #[inline]
417 pub async fn lease_grant(
418 &mut self,
419 ttl: i64,
420 options: Option<LeaseGrantOptions>,
421 ) -> Result<LeaseGrantResponse> {
422 self.lease.grant(ttl, options).await
423 }
424
425 #[inline]
427 pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
428 self.lease.revoke(id).await
429 }
430
431 #[inline]
434 pub async fn lease_keep_alive(
435 &mut self,
436 id: i64,
437 ) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
438 self.lease.keep_alive(id).await
439 }
440
441 #[inline]
443 pub async fn lease_time_to_live(
444 &mut self,
445 id: i64,
446 options: Option<LeaseTimeToLiveOptions>,
447 ) -> Result<LeaseTimeToLiveResponse> {
448 self.lease.time_to_live(id, options).await
449 }
450
451 #[inline]
453 pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
454 self.lease.leases().await
455 }
456
457 #[inline]
464 pub async fn lock(
465 &mut self,
466 name: impl Into<Vec<u8>>,
467 options: Option<LockOptions>,
468 ) -> Result<LockResponse> {
469 self.lock.lock(name, options).await
470 }
471
472 #[inline]
476 pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
477 self.lock.unlock(key).await
478 }
479
480 #[inline]
482 pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
483 self.auth.auth_enable().await
484 }
485
486 #[inline]
488 pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
489 self.auth.auth_disable().await
490 }
491
492 #[inline]
494 pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
495 self.auth.role_add(name).await
496 }
497
498 #[inline]
500 pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
501 self.auth.role_delete(name).await
502 }
503
504 #[inline]
506 pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
507 self.auth.role_get(name).await
508 }
509
510 #[inline]
512 pub async fn role_list(&mut self) -> Result<RoleListResponse> {
513 self.auth.role_list().await
514 }
515
516 #[inline]
518 pub async fn role_grant_permission(
519 &mut self,
520 name: impl Into<String>,
521 perm: Permission,
522 ) -> Result<RoleGrantPermissionResponse> {
523 self.auth.role_grant_permission(name, perm).await
524 }
525
526 #[inline]
528 pub async fn role_revoke_permission(
529 &mut self,
530 name: impl Into<String>,
531 key: impl Into<Vec<u8>>,
532 options: Option<RoleRevokePermissionOptions>,
533 ) -> Result<RoleRevokePermissionResponse> {
534 self.auth.role_revoke_permission(name, key, options).await
535 }
536
537 #[inline]
539 pub async fn user_add(
540 &mut self,
541 name: impl Into<String>,
542 password: impl Into<String>,
543 options: Option<UserAddOptions>,
544 ) -> Result<UserAddResponse> {
545 self.auth.user_add(name, password, options).await
546 }
547
548 #[inline]
550 pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
551 self.auth.user_get(name).await
552 }
553
554 #[inline]
556 pub async fn user_list(&mut self) -> Result<UserListResponse> {
557 self.auth.user_list().await
558 }
559
560 #[inline]
562 pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
563 self.auth.user_delete(name).await
564 }
565
566 #[inline]
568 pub async fn user_change_password(
569 &mut self,
570 name: impl Into<String>,
571 password: impl Into<String>,
572 ) -> Result<UserChangePasswordResponse> {
573 self.auth.user_change_password(name, password).await
574 }
575
576 #[inline]
578 pub async fn user_grant_role(
579 &mut self,
580 user: impl Into<String>,
581 role: impl Into<String>,
582 ) -> Result<UserGrantRoleResponse> {
583 self.auth.user_grant_role(user, role).await
584 }
585
586 #[inline]
588 pub async fn user_revoke_role(
589 &mut self,
590 user: impl Into<String>,
591 role: impl Into<String>,
592 ) -> Result<UserRevokeRoleResponse> {
593 self.auth.user_revoke_role(user, role).await
594 }
595
596 #[inline]
598 pub async fn alarm(
599 &mut self,
600 alarm_action: AlarmAction,
601 alarm_type: AlarmType,
602 options: Option<AlarmOptions>,
603 ) -> Result<AlarmResponse> {
604 self.maintenance
605 .alarm(alarm_action, alarm_type, options)
606 .await
607 }
608
609 #[inline]
611 pub async fn status(&mut self) -> Result<StatusResponse> {
612 self.maintenance.status().await
613 }
614
615 #[inline]
617 pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
618 self.maintenance.defragment().await
619 }
620
621 #[inline]
625 pub async fn hash(&mut self) -> Result<HashResponse> {
626 self.maintenance.hash().await
627 }
628
629 #[inline]
632 pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
633 self.maintenance.hash_kv(revision).await
634 }
635
636 #[inline]
638 pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
639 self.maintenance.snapshot().await
640 }
641
642 #[inline]
644 pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
645 &mut self,
646 urls: S,
647 options: Option<MemberAddOptions>,
648 ) -> Result<MemberAddResponse> {
649 let mut eps = Vec::new();
650 for e in urls.as_ref() {
651 let e = e.as_ref();
652 let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
653 e.to_string()
654 } else {
655 HTTP_PREFIX.to_owned() + e
656 };
657 eps.push(url);
658 }
659
660 self.cluster.member_add(eps, options).await
661 }
662
663 #[inline]
665 pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
666 self.cluster.member_remove(id).await
667 }
668
669 #[inline]
671 pub async fn member_update(
672 &mut self,
673 id: u64,
674 url: impl Into<Vec<String>>,
675 ) -> Result<MemberUpdateResponse> {
676 self.cluster.member_update(id, url).await
677 }
678
679 #[inline]
681 pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
682 self.cluster.member_promote(id).await
683 }
684
685 #[inline]
687 pub async fn member_list(&mut self) -> Result<MemberListResponse> {
688 self.cluster.member_list().await
689 }
690
691 #[inline]
693 pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
694 self.maintenance.move_leader(target_id).await
695 }
696
697 #[inline]
701 pub async fn campaign(
702 &mut self,
703 name: impl Into<Vec<u8>>,
704 value: impl Into<Vec<u8>>,
705 lease: i64,
706 ) -> Result<CampaignResponse> {
707 self.election.campaign(name, value, lease).await
708 }
709
710 #[inline]
712 pub async fn proclaim(
713 &mut self,
714 value: impl Into<Vec<u8>>,
715 options: Option<ProclaimOptions>,
716 ) -> Result<ProclaimResponse> {
717 self.election.proclaim(value, options).await
718 }
719
720 #[inline]
722 pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
723 self.election.leader(name).await
724 }
725
726 #[inline]
729 pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
730 self.election.observe(name).await
731 }
732
733 #[inline]
735 pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
736 self.election.resign(option).await
737 }
738
739 pub async fn set_client_auth(&mut self, name: String, password: String) -> Result<()> {
741 self.auth.set_client_auth(name, password).await
742 }
743
744 pub fn remove_client_auth(&mut self) {
746 self.auth.remove_client_auth();
747 }
748}
749
750#[derive(Debug, Default, Clone)]
752pub struct ConnectOptions {
753 user: Option<(String, String)>,
755 keep_alive: Option<(Duration, Duration)>,
757 keep_alive_while_idle: bool,
759 timeout: Option<Duration>,
761 connect_timeout: Option<Duration>,
763 tcp_keepalive: Option<Duration>,
765 #[cfg(feature = "tls")]
766 tls: Option<TlsOptions>,
767 #[cfg(feature = "tls-openssl")]
768 otls: Option<OpenSslResult<OpenSslConnector>>,
769}
770
771impl ConnectOptions {
772 #[inline]
774 pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
775 self.user = Some((name.into(), password.into()));
776 self
777 }
778
779 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
783 #[cfg(feature = "tls")]
784 #[inline]
785 pub fn with_tls(mut self, tls: TlsOptions) -> Self {
786 self.tls = Some(tls);
787 self
788 }
789
790 #[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
792 #[cfg(feature = "tls-openssl")]
793 #[inline]
794 pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
795 self.otls = Some(otls.build());
800 self
801 }
802
803 #[inline]
805 pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
806 self.keep_alive = Some((interval, timeout));
807 self
808 }
809
810 #[inline]
812 pub fn with_timeout(mut self, timeout: Duration) -> Self {
813 self.timeout = Some(timeout);
814 self
815 }
816
817 #[inline]
819 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
820 self.connect_timeout = Some(timeout);
821 self
822 }
823
824 #[inline]
826 pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
827 self.tcp_keepalive = Some(tcp_keepalive);
828 self
829 }
830
831 #[inline]
837 pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
838 self.keep_alive_while_idle = enabled;
839 self
840 }
841
842 #[inline]
844 pub const fn new() -> Self {
845 ConnectOptions {
846 user: None,
847 keep_alive: None,
848 keep_alive_while_idle: true,
849 timeout: None,
850 connect_timeout: None,
851 tcp_keepalive: None,
852 #[cfg(feature = "tls")]
853 tls: None,
854 #[cfg(feature = "tls-openssl")]
855 otls: None,
856 }
857 }
858}