pub mod api;
pub mod auth;
pub mod cache;
pub mod common;
pub mod config;
pub mod crypto;
pub mod error;
pub mod logging;
pub mod naming;
pub mod remote;
#[cfg(test)]
mod tests;
pub use api::config::{
ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigChangeNotifyRequest,
ConfigInfo, ConfigListenContext, ConfigPublishRequest, ConfigPublishResponse,
ConfigQueryRequest, ConfigQueryResponse, ConfigRemoveRequest, ConfigRemoveResponse,
ConfigSearchRequest, ConfigSearchResponse, ConfigSearchItem,
};
pub use api::naming::{
BatchInstanceRequest, BatchInstanceResponse, Instance, InstanceRequest, InstanceResponse,
QueryServiceResponse, Service, ServiceListRequest, ServiceListResponse, ServiceQueryRequest,
SubscribeServiceRequest, SubscribeServiceResponse,
};
pub use api::remote::{RequestTrait, ResponseTrait};
pub use auth::{AccessToken, AuthManager, Credentials};
pub use cache::FileCache;
pub use common::constants::*;
pub use config::{ConfigChangeEvent, ConfigChangeType, ConfigListener, ConfigService};
pub use error::{BatataError, Result};
pub use naming::{
LoadBalancer, NamingService, RandomBalancer, ServiceChangeEvent, ServiceListener,
WeightedRoundRobinBalancer,
};
pub use remote::RpcClient;
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use tracing::info;
#[derive(Clone, Debug)]
pub struct CacheConfig {
pub cache_dir: Option<String>,
pub not_load_cache_at_start: bool,
pub update_cache_when_empty: bool,
pub failover_enabled: bool,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
cache_dir: None,
not_load_cache_at_start: false,
update_cache_when_empty: true,
failover_enabled: true,
}
}
}
impl CacheConfig {
pub fn new(cache_dir: impl Into<String>) -> Self {
Self {
cache_dir: Some(cache_dir.into()),
..Default::default()
}
}
pub fn with_cache_dir(mut self, dir: impl Into<String>) -> Self {
self.cache_dir = Some(dir.into());
self
}
pub fn with_not_load_cache_at_start(mut self, enabled: bool) -> Self {
self.not_load_cache_at_start = enabled;
self
}
pub fn with_update_cache_when_empty(mut self, enabled: bool) -> Self {
self.update_cache_when_empty = enabled;
self
}
pub fn with_failover_enabled(mut self, enabled: bool) -> Self {
self.failover_enabled = enabled;
self
}
}
#[derive(Clone, Debug, Default)]
pub struct TlsConfig {
pub enabled: bool,
pub ca_cert_path: Option<String>,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
pub skip_verify: bool,
}
impl TlsConfig {
pub fn new() -> Self {
Self {
enabled: true,
..Default::default()
}
}
pub fn with_ca_cert(mut self, path: impl Into<String>) -> Self {
self.ca_cert_path = Some(path.into());
self
}
pub fn with_client_cert(
mut self,
cert_path: impl Into<String>,
key_path: impl Into<String>,
) -> Self {
self.client_cert_path = Some(cert_path.into());
self.client_key_path = Some(key_path.into());
self
}
pub fn with_skip_verify(mut self, skip: bool) -> Self {
self.skip_verify = skip;
self
}
}
#[derive(Clone, Debug)]
pub struct ClientConfig {
pub server_addrs: Vec<String>,
pub namespace: String,
pub app_name: String,
pub labels: HashMap<String, String>,
pub timeout_ms: u64,
pub retry_times: u32,
pub credentials: Credentials,
pub tls: TlsConfig,
pub cache: CacheConfig,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
server_addrs: vec!["localhost:8848".to_string()],
namespace: DEFAULT_NAMESPACE.to_string(),
app_name: String::new(),
labels: HashMap::new(),
timeout_ms: DEFAULT_TIMEOUT_MS,
retry_times: 3,
credentials: Credentials::default(),
tls: TlsConfig::default(),
cache: CacheConfig::default(),
}
}
}
pub struct BatataClientBuilder {
config: ClientConfig,
}
impl BatataClientBuilder {
pub fn new() -> Self {
Self {
config: ClientConfig::default(),
}
}
pub fn server_addr(mut self, addr: &str) -> Self {
self.config.server_addrs = vec![addr.to_string()];
self
}
pub fn server_addrs(mut self, addrs: Vec<String>) -> Self {
self.config.server_addrs = addrs;
self
}
pub fn namespace(mut self, namespace: &str) -> Self {
self.config.namespace = namespace.to_string();
self
}
pub fn app_name(mut self, app_name: &str) -> Self {
self.config.app_name = app_name.to_string();
self
}
pub fn label(mut self, key: &str, value: &str) -> Self {
self.config.labels.insert(key.to_string(), value.to_string());
self
}
pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
self.config.labels = labels;
self
}
pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
self.config.timeout_ms = timeout_ms;
self
}
pub fn retry_times(mut self, retry_times: u32) -> Self {
self.config.retry_times = retry_times;
self
}
pub fn username_password(mut self, username: &str, password: &str) -> Self {
self.config.credentials = Credentials::with_username_password(username, password);
self
}
pub fn access_key(mut self, access_key: &str, secret_key: &str) -> Self {
self.config.credentials = Credentials::with_access_key(access_key, secret_key);
self
}
pub fn credentials(mut self, credentials: Credentials) -> Self {
self.config.credentials = credentials;
self
}
pub fn acm(
mut self,
access_key: &str,
secret_key: &str,
endpoint: &str,
region_id: &str,
) -> Self {
self.config.credentials =
Credentials::with_acm(access_key, secret_key, endpoint, region_id);
self
}
pub fn acm_endpoint(mut self, endpoint: &str) -> Self {
self.config.credentials.set_endpoint(endpoint);
self
}
pub fn acm_region_id(mut self, region_id: &str) -> Self {
self.config.credentials.set_region_id(region_id);
self
}
pub fn tls(mut self, enabled: bool) -> Self {
self.config.tls.enabled = enabled;
self
}
pub fn tls_config(mut self, tls: TlsConfig) -> Self {
self.config.tls = tls;
self
}
pub fn cache_dir(mut self, dir: &str) -> Self {
self.config.cache.cache_dir = Some(dir.to_string());
self
}
pub fn cache_config(mut self, cache: CacheConfig) -> Self {
self.config.cache = cache;
self
}
pub fn not_load_cache_at_start(mut self, enabled: bool) -> Self {
self.config.cache.not_load_cache_at_start = enabled;
self
}
pub fn update_cache_when_empty(mut self, enabled: bool) -> Self {
self.config.cache.update_cache_when_empty = enabled;
self
}
pub fn failover_enabled(mut self, enabled: bool) -> Self {
self.config.cache.failover_enabled = enabled;
self
}
pub async fn build(self) -> Result<BatataClient> {
BatataClient::new(self.config).await
}
}
impl Default for BatataClientBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct BatataClient {
#[allow(dead_code)]
config: ClientConfig,
rpc_client: Arc<RpcClient>,
config_service: Arc<ConfigService>,
naming_service: Arc<NamingService>,
started: Arc<RwLock<bool>>,
}
impl BatataClient {
pub fn builder() -> BatataClientBuilder {
BatataClientBuilder::new()
}
pub async fn new(config: ClientConfig) -> Result<Self> {
let rpc_client = RpcClient::new(config.server_addrs.clone())?
.with_namespace(&config.namespace)
.with_app_name(&config.app_name)
.with_labels(config.labels.clone())
.with_timeout(config.timeout_ms)
.with_retry(config.retry_times);
rpc_client.start().await?;
let rpc_client = Arc::new(rpc_client);
let config_service = Arc::new(ConfigService::new(
rpc_client.clone(),
&config.namespace,
config.cache.clone(),
));
let naming_service = Arc::new(NamingService::new(
rpc_client.clone(),
&config.namespace,
config.cache.clone(),
));
let client = Self {
config,
rpc_client,
config_service,
naming_service,
started: Arc::new(RwLock::new(true)),
};
info!("BatataClient created and connected");
Ok(client)
}
pub fn config_service(&self) -> Arc<ConfigService> {
self.config_service.clone()
}
pub fn naming_service(&self) -> Arc<NamingService> {
self.naming_service.clone()
}
pub fn rpc_client(&self) -> Arc<RpcClient> {
self.rpc_client.clone()
}
pub fn is_connected(&self) -> bool {
self.rpc_client.is_connected()
}
pub fn connection_id(&self) -> Option<String> {
self.rpc_client.connection_id()
}
pub async fn shutdown(&self) {
if !*self.started.read() {
return;
}
*self.started.write() = false;
self.config_service.stop().await;
self.naming_service.stop().await;
self.rpc_client.stop().await;
info!("BatataClient shutdown");
}
pub async fn start_config_service(&self) -> Result<()> {
self.config_service.start().await
}
pub async fn start_naming_service(&self) -> Result<()> {
self.naming_service.start().await
}
}
impl Drop for BatataClient {
fn drop(&mut self) {
}
}
pub mod prelude {
pub use crate::{
BatataClient, BatataClientBuilder, BatataError, CacheConfig, ClientConfig,
ConfigChangeEvent, ConfigListener, ConfigService, Instance, NamingService, Result,
Service, ServiceChangeEvent, ServiceListener,
};
}