#[cfg(feature = "raw-channel")]
use crate::channel::Channel;
use crate::error::{Error, Result};
use crate::intercept::{InterceptedChannel, Interceptor};
use crate::lock::RwLockExt;
#[cfg(feature = "tls-openssl")]
use crate::openssl_tls::{OpenSslClientConfig, OpenSslConnector};
use crate::rpc::auth::Permission;
use crate::rpc::auth::{AuthClient, AuthDisableResponse, AuthEnableResponse};
use crate::rpc::auth::{
RoleAddResponse, RoleDeleteResponse, RoleGetResponse, RoleGrantPermissionResponse,
RoleListResponse, RoleRevokePermissionOptions, RoleRevokePermissionResponse, UserAddOptions,
UserAddResponse, UserChangePasswordResponse, UserDeleteResponse, UserGetResponse,
UserGrantRoleResponse, UserListResponse, UserRevokeRoleResponse,
};
use crate::rpc::cluster::{
ClusterClient, MemberAddOptions, MemberAddResponse, MemberListResponse, MemberPromoteResponse,
MemberRemoveResponse, MemberUpdateResponse,
};
use crate::rpc::election::{
CampaignResponse, ElectionClient, LeaderResponse, ObserveStream, ProclaimOptions,
ProclaimResponse, ResignOptions, ResignResponse,
};
use crate::rpc::kv::{
CompactionOptions, CompactionResponse, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
KvClient, PutOptions, PutResponse, Txn, TxnResponse,
};
use crate::rpc::lease::{
LeaseClient, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
LeaseLeasesResponse, LeaseRevokeResponse, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse,
};
use crate::rpc::lock::{LockClient, LockOptions, LockResponse, UnlockResponse};
use crate::rpc::maintenance::{
AlarmAction, AlarmOptions, AlarmResponse, AlarmType, DefragmentResponse, HashKvResponse,
HashResponse, MaintenanceClient, MoveLeaderResponse, SnapshotStreaming, StatusResponse,
};
use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream};
#[cfg(feature = "tls-openssl")]
use crate::OpenSslResult;
#[cfg(feature = "tls")]
use crate::TlsOptions;
use http::uri::Uri;
use tonic::metadata::{Ascii, MetadataValue};
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tonic::transport::{channel::Change, Endpoint};
const HTTP_PREFIX: &str = "http://";
const HTTPS_PREFIX: &str = "https://";
#[derive(Clone)]
pub struct Client {
kv: KvClient,
watch: WatchClient,
lease: LeaseClient,
lock: LockClient,
auth: AuthClient,
maintenance: MaintenanceClient,
cluster: ClusterClient,
election: ElectionClient,
options: ConnectOptions,
tx: Option<Sender<Change<Uri, Endpoint>>>,
auth_token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
}
impl Client {
pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
endpoints: S,
options: Option<ConnectOptions>,
) -> Result<Self> {
#[cfg(not(feature = "tls-openssl"))]
let make_balanced_channel = crate::channel::Tonic;
#[cfg(feature = "tls-openssl")]
let make_balanced_channel = crate::channel::Openssl {
conn: options
.clone()
.and_then(|o| o.otls)
.unwrap_or_else(OpenSslConnector::create_default)?,
};
Self::connect_with_balanced_channel(endpoints, options, make_balanced_channel).await
}
pub async fn connect_with_balanced_channel<E: AsRef<str>, S: AsRef<[E]>, MBC>(
endpoints: S,
options: Option<ConnectOptions>,
make_balanced_channel: MBC,
) -> Result<Self>
where
MBC: crate::channel::BalancedChannelBuilder,
crate::error::Error: From<MBC::Error>,
{
let options = options.unwrap_or_default();
let endpoints = {
let mut eps = Vec::new();
for e in endpoints.as_ref() {
let channel = Self::build_endpoint(e.as_ref(), &options)?;
eps.push(channel);
}
eps
};
if endpoints.is_empty() {
return Err(Error::InvalidArgs(String::from("empty endpoints")));
}
let auth_token = Arc::new(RwLock::new(None));
let (channel, tx) = make_balanced_channel.balanced_channel(64)?;
let channel = InterceptedChannel::new(
channel,
Interceptor {
require_leader: options.require_leader,
auth_token: auth_token.clone(),
},
);
for endpoint in endpoints {
tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
.await
.map_err(|_| {
Error::Internal("failed to insert endpoint into the balanced channel".into())
})?;
}
let client = Self::build_client(channel, Some(tx), auth_token, options);
client.refresh_token().await?;
Ok(client)
}
#[cfg(feature = "raw-channel")]
pub async fn from_channel(channel: Channel, options: Option<ConnectOptions>) -> Result<Self> {
let options = options.unwrap_or_default();
let auth_token = Arc::new(RwLock::new(None));
let channel = InterceptedChannel::new(
channel,
Interceptor {
require_leader: options.require_leader,
auth_token: auth_token.clone(),
},
);
let client = Self::build_client(channel, None, auth_token, options);
client.refresh_token().await?;
Ok(client)
}
fn build_endpoint(url: &str, options: &ConnectOptions) -> Result<Endpoint> {
use tonic::transport::Channel as TonicChannel;
let mut endpoint = if url.starts_with(HTTP_PREFIX) {
#[cfg(feature = "tls")]
if options.tls.is_some() {
return Err(Error::InvalidArgs(String::from(
"TLS options are only supported with HTTPS URLs",
)));
}
TonicChannel::builder(url.parse()?)
} else if url.starts_with(HTTPS_PREFIX) {
#[cfg(not(any(feature = "tls", feature = "tls-openssl")))]
return Err(Error::InvalidArgs(String::from(
"HTTPS URLs are only supported with the feature \"tls\"",
)));
#[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
{
TonicChannel::builder(url.parse()?)
}
#[cfg(feature = "tls")]
{
let tls = options.tls.clone().unwrap_or_default();
TonicChannel::builder(url.parse()?).tls_config(tls)?
}
} else {
#[cfg(feature = "tls")]
{
let tls = options.tls.clone();
match tls {
Some(tls) => {
let e = HTTPS_PREFIX.to_owned() + url;
TonicChannel::builder(e.parse()?).tls_config(tls)?
}
None => {
let e = HTTP_PREFIX.to_owned() + url;
TonicChannel::builder(e.parse()?)
}
}
}
#[cfg(all(feature = "tls-openssl", not(feature = "tls")))]
{
let pfx = if options.otls.as_ref().is_some() {
HTTPS_PREFIX
} else {
HTTP_PREFIX
};
let e = pfx.to_owned() + url;
TonicChannel::builder(e.parse()?)
}
#[cfg(all(not(feature = "tls"), not(feature = "tls-openssl")))]
{
let e = HTTP_PREFIX.to_owned() + url;
TonicChannel::builder(e.parse()?)
}
};
if let Some((interval, timeout)) = options.keep_alive {
endpoint = endpoint
.keep_alive_while_idle(options.keep_alive_while_idle)
.http2_keep_alive_interval(interval)
.keep_alive_timeout(timeout);
}
if let Some(timeout) = options.timeout {
endpoint = endpoint.timeout(timeout);
}
if let Some(timeout) = options.connect_timeout {
endpoint = endpoint.connect_timeout(timeout);
}
if let Some(tcp_keepalive) = options.tcp_keepalive {
endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive));
}
Ok(endpoint)
}
fn build_client(
channel: InterceptedChannel,
tx: Option<Sender<Change<Uri, Endpoint>>>,
auth_token: Arc<RwLock<Option<MetadataValue<Ascii>>>>,
options: ConnectOptions,
) -> Self {
let kv = KvClient::new(channel.clone());
let watch = WatchClient::new(channel.clone());
let lease = LeaseClient::new(channel.clone());
let lock = LockClient::new(channel.clone());
let auth = AuthClient::new(channel.clone());
let cluster = ClusterClient::new(channel.clone());
let maintenance = MaintenanceClient::new(channel.clone());
let election = ElectionClient::new(channel);
Self {
kv,
watch,
lease,
lock,
auth,
maintenance,
cluster,
election,
options,
tx,
auth_token,
}
}
#[inline]
pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
let Some(tx) = &self.tx else {
return Err(Error::EndpointsNotManaged);
};
tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
.await
.map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {e}")))
}
#[inline]
pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
let uri = http::Uri::from_str(endpoint.as_ref())?;
let Some(tx) = &self.tx else {
return Err(Error::EndpointsNotManaged);
};
tx.send(Change::Remove(uri))
.await
.map_err(|e| Error::EndpointError(format!("failed to remove endpoint because of {e}")))
}
#[inline]
pub fn kv_client(&self) -> KvClient {
self.kv.clone()
}
#[inline]
pub fn watch_client(&self) -> WatchClient {
self.watch.clone()
}
#[inline]
pub fn lease_client(&self) -> LeaseClient {
self.lease.clone()
}
#[inline]
pub fn auth_client(&self) -> AuthClient {
self.auth.clone()
}
#[inline]
pub fn maintenance_client(&self) -> MaintenanceClient {
self.maintenance.clone()
}
#[inline]
pub fn cluster_client(&self) -> ClusterClient {
self.cluster.clone()
}
#[inline]
pub fn lock_client(&self) -> LockClient {
self.lock.clone()
}
#[inline]
pub fn election_client(&self) -> ElectionClient {
self.election.clone()
}
#[inline]
pub async fn put(
&mut self,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
options: Option<PutOptions>,
) -> Result<PutResponse> {
self.kv.put(key, value, options).await
}
#[inline]
pub async fn get(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<GetOptions>,
) -> Result<GetResponse> {
self.kv.get(key, options).await
}
#[inline]
pub async fn delete(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<DeleteOptions>,
) -> Result<DeleteResponse> {
self.kv.delete(key, options).await
}
#[inline]
pub async fn compact(
&mut self,
revision: i64,
options: Option<CompactionOptions>,
) -> Result<CompactionResponse> {
self.kv.compact(revision, options).await
}
#[inline]
pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
self.kv.txn(txn).await
}
#[inline]
pub async fn watch(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<WatchOptions>,
) -> Result<WatchStream> {
self.watch.watch(key, options).await
}
#[inline]
pub async fn lease_grant(
&mut self,
ttl: i64,
options: Option<LeaseGrantOptions>,
) -> Result<LeaseGrantResponse> {
self.lease.grant(ttl, options).await
}
#[inline]
pub async fn lease_revoke(&mut self, id: i64) -> Result<LeaseRevokeResponse> {
self.lease.revoke(id).await
}
#[inline]
pub async fn lease_keep_alive(
&mut self,
id: i64,
) -> Result<(LeaseKeeper, LeaseKeepAliveStream)> {
self.lease.keep_alive(id).await
}
#[inline]
pub async fn lease_time_to_live(
&mut self,
id: i64,
options: Option<LeaseTimeToLiveOptions>,
) -> Result<LeaseTimeToLiveResponse> {
self.lease.time_to_live(id, options).await
}
#[inline]
pub async fn leases(&mut self) -> Result<LeaseLeasesResponse> {
self.lease.leases().await
}
#[inline]
pub async fn lock(
&mut self,
name: impl Into<Vec<u8>>,
options: Option<LockOptions>,
) -> Result<LockResponse> {
self.lock.lock(name, options).await
}
#[inline]
pub async fn unlock(&mut self, key: impl Into<Vec<u8>>) -> Result<UnlockResponse> {
self.lock.unlock(key).await
}
#[inline]
pub async fn auth_enable(&mut self) -> Result<AuthEnableResponse> {
self.auth.auth_enable().await
}
#[inline]
pub async fn auth_disable(&mut self) -> Result<AuthDisableResponse> {
self.auth.auth_disable().await
}
#[inline]
pub async fn role_add(&mut self, name: impl Into<String>) -> Result<RoleAddResponse> {
self.auth.role_add(name).await
}
#[inline]
pub async fn role_delete(&mut self, name: impl Into<String>) -> Result<RoleDeleteResponse> {
self.auth.role_delete(name).await
}
#[inline]
pub async fn role_get(&mut self, name: impl Into<String>) -> Result<RoleGetResponse> {
self.auth.role_get(name).await
}
#[inline]
pub async fn role_list(&mut self) -> Result<RoleListResponse> {
self.auth.role_list().await
}
#[inline]
pub async fn role_grant_permission(
&mut self,
name: impl Into<String>,
perm: Permission,
) -> Result<RoleGrantPermissionResponse> {
self.auth.role_grant_permission(name, perm).await
}
#[inline]
pub async fn role_revoke_permission(
&mut self,
name: impl Into<String>,
key: impl Into<Vec<u8>>,
options: Option<RoleRevokePermissionOptions>,
) -> Result<RoleRevokePermissionResponse> {
self.auth.role_revoke_permission(name, key, options).await
}
#[inline]
pub async fn user_add(
&mut self,
name: impl Into<String>,
password: impl Into<String>,
options: Option<UserAddOptions>,
) -> Result<UserAddResponse> {
self.auth.user_add(name, password, options).await
}
#[inline]
pub async fn user_get(&mut self, name: impl Into<String>) -> Result<UserGetResponse> {
self.auth.user_get(name).await
}
#[inline]
pub async fn user_list(&mut self) -> Result<UserListResponse> {
self.auth.user_list().await
}
#[inline]
pub async fn user_delete(&mut self, name: impl Into<String>) -> Result<UserDeleteResponse> {
self.auth.user_delete(name).await
}
#[inline]
pub async fn user_change_password(
&mut self,
name: impl Into<String>,
password: impl Into<String>,
) -> Result<UserChangePasswordResponse> {
self.auth.user_change_password(name, password).await
}
#[inline]
pub async fn user_grant_role(
&mut self,
user: impl Into<String>,
role: impl Into<String>,
) -> Result<UserGrantRoleResponse> {
self.auth.user_grant_role(user, role).await
}
#[inline]
pub async fn user_revoke_role(
&mut self,
user: impl Into<String>,
role: impl Into<String>,
) -> Result<UserRevokeRoleResponse> {
self.auth.user_revoke_role(user, role).await
}
#[inline]
pub async fn alarm(
&mut self,
alarm_action: AlarmAction,
alarm_type: AlarmType,
options: Option<AlarmOptions>,
) -> Result<AlarmResponse> {
self.maintenance
.alarm(alarm_action, alarm_type, options)
.await
}
#[inline]
pub async fn status(&mut self) -> Result<StatusResponse> {
self.maintenance.status().await
}
#[inline]
pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
self.maintenance.defragment().await
}
#[inline]
pub async fn hash(&mut self) -> Result<HashResponse> {
self.maintenance.hash().await
}
#[inline]
pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
self.maintenance.hash_kv(revision).await
}
#[inline]
pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
self.maintenance.snapshot().await
}
#[inline]
pub async fn member_add<E: AsRef<str>, S: AsRef<[E]>>(
&mut self,
urls: S,
options: Option<MemberAddOptions>,
) -> Result<MemberAddResponse> {
let mut eps = Vec::new();
for e in urls.as_ref() {
let e = e.as_ref();
let url = if e.starts_with(HTTP_PREFIX) || e.starts_with(HTTPS_PREFIX) {
e.to_string()
} else {
HTTP_PREFIX.to_owned() + e
};
eps.push(url);
}
self.cluster.member_add(eps, options).await
}
#[inline]
pub async fn member_remove(&mut self, id: u64) -> Result<MemberRemoveResponse> {
self.cluster.member_remove(id).await
}
#[inline]
pub async fn member_update(
&mut self,
id: u64,
url: impl Into<Vec<String>>,
) -> Result<MemberUpdateResponse> {
self.cluster.member_update(id, url).await
}
#[inline]
pub async fn member_promote(&mut self, id: u64) -> Result<MemberPromoteResponse> {
self.cluster.member_promote(id).await
}
#[inline]
pub async fn member_list(&mut self) -> Result<MemberListResponse> {
self.cluster.member_list().await
}
#[inline]
pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
self.maintenance.move_leader(target_id).await
}
#[inline]
pub async fn campaign(
&mut self,
name: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
lease: i64,
) -> Result<CampaignResponse> {
self.election.campaign(name, value, lease).await
}
#[inline]
pub async fn proclaim(
&mut self,
value: impl Into<Vec<u8>>,
options: Option<ProclaimOptions>,
) -> Result<ProclaimResponse> {
self.election.proclaim(value, options).await
}
#[inline]
pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
self.election.leader(name).await
}
#[inline]
pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
self.election.observe(name).await
}
#[inline]
pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
self.election.resign(option).await
}
async fn do_authenticate(
&self,
user: String,
password: String,
) -> Result<MetadataValue<Ascii>> {
let resp = self.auth_client().authenticate(user, password).await?;
let token = resp.token().parse()?;
Ok(token)
}
pub async fn refresh_token(&self) -> Result<()> {
if let Some((user, password)) = self.options.user.as_ref() {
let token = self.do_authenticate(user.clone(), password.clone()).await?;
self.auth_token.write_unpoisoned().replace(token);
} else {
let _ = self.auth_token.write_unpoisoned().take();
}
Ok(())
}
pub async fn update_user(&mut self, user: Option<(String, String)>) -> Result<()> {
if let Some((ref name, ref password)) = user {
let token = self.do_authenticate(name.clone(), password.clone()).await?;
self.auth_token.write_unpoisoned().replace(token);
} else {
let _ = self.auth_token.write_unpoisoned().take();
}
self.options.user = user;
Ok(())
}
}
#[derive(Debug, Default, Clone)]
pub struct ConnectOptions {
user: Option<(String, String)>,
keep_alive: Option<(Duration, Duration)>,
keep_alive_while_idle: bool,
timeout: Option<Duration>,
connect_timeout: Option<Duration>,
tcp_keepalive: Option<Duration>,
#[cfg(feature = "tls")]
tls: Option<TlsOptions>,
#[cfg(feature = "tls-openssl")]
otls: Option<OpenSslResult<OpenSslConnector>>,
require_leader: bool,
}
impl ConnectOptions {
#[inline]
pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
self.user = Some((name.into(), password.into()));
self
}
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
#[cfg(feature = "tls")]
#[inline]
pub fn with_tls(mut self, tls: TlsOptions) -> Self {
self.tls = Some(tls);
self
}
#[cfg_attr(docsrs, doc(cfg(feature = "tls-openssl")))]
#[cfg(feature = "tls-openssl")]
#[inline]
pub fn with_openssl_tls(mut self, otls: OpenSslClientConfig) -> Self {
self.otls = Some(otls.build());
self
}
#[inline]
pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
self.keep_alive = Some((interval, timeout));
self
}
#[inline]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
#[inline]
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}
#[inline]
pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
self.tcp_keepalive = Some(tcp_keepalive);
self
}
#[inline]
pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
self.keep_alive_while_idle = enabled;
self
}
#[inline]
pub fn with_require_leader(mut self, require_leader: bool) -> Self {
self.require_leader = require_leader;
self
}
#[inline]
pub const fn new() -> Self {
ConnectOptions {
user: None,
keep_alive: None,
keep_alive_while_idle: true,
timeout: None,
connect_timeout: None,
tcp_keepalive: None,
#[cfg(feature = "tls")]
tls: None,
#[cfg(feature = "tls-openssl")]
otls: None,
require_leader: false,
}
}
}