1pub mod api;
40pub mod auth;
41pub mod cache;
42pub mod common;
43pub mod config;
44pub mod crypto;
45pub mod error;
46pub mod logging;
47pub mod naming;
48pub mod remote;
49
50#[cfg(test)]
51mod tests;
52
53pub use api::config::{
54 ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigChangeNotifyRequest,
55 ConfigInfo, ConfigListenContext, ConfigPublishRequest, ConfigPublishResponse,
56 ConfigQueryRequest, ConfigQueryResponse, ConfigRemoveRequest, ConfigRemoveResponse,
57 ConfigSearchRequest, ConfigSearchResponse, ConfigSearchItem,
58};
59pub use api::naming::{
60 BatchInstanceRequest, BatchInstanceResponse, Instance, InstanceRequest, InstanceResponse,
61 QueryServiceResponse, Service, ServiceListRequest, ServiceListResponse, ServiceQueryRequest,
62 SubscribeServiceRequest, SubscribeServiceResponse,
63};
64pub use api::remote::{RequestTrait, ResponseTrait};
65pub use auth::{AccessToken, AuthManager, Credentials};
66pub use cache::FileCache;
67pub use common::constants::*;
68pub use config::{ConfigChangeEvent, ConfigChangeType, ConfigListener, ConfigService};
69pub use error::{BatataError, Result};
70pub use naming::{
71 LoadBalancer, NamingService, RandomBalancer, ServiceChangeEvent, ServiceListener,
72 WeightedRoundRobinBalancer,
73};
74pub use remote::RpcClient;
75
76use std::collections::HashMap;
77use std::sync::Arc;
78
79use parking_lot::RwLock;
80use tracing::info;
81
82#[derive(Clone, Debug)]
84pub struct CacheConfig {
85 pub cache_dir: Option<String>,
87 pub not_load_cache_at_start: bool,
89 pub update_cache_when_empty: bool,
91 pub failover_enabled: bool,
93}
94
95impl Default for CacheConfig {
96 fn default() -> Self {
97 Self {
98 cache_dir: None,
99 not_load_cache_at_start: false,
100 update_cache_when_empty: true,
101 failover_enabled: true,
102 }
103 }
104}
105
106impl CacheConfig {
107 pub fn new(cache_dir: impl Into<String>) -> Self {
109 Self {
110 cache_dir: Some(cache_dir.into()),
111 ..Default::default()
112 }
113 }
114
115 pub fn with_cache_dir(mut self, dir: impl Into<String>) -> Self {
117 self.cache_dir = Some(dir.into());
118 self
119 }
120
121 pub fn with_not_load_cache_at_start(mut self, enabled: bool) -> Self {
123 self.not_load_cache_at_start = enabled;
124 self
125 }
126
127 pub fn with_update_cache_when_empty(mut self, enabled: bool) -> Self {
129 self.update_cache_when_empty = enabled;
130 self
131 }
132
133 pub fn with_failover_enabled(mut self, enabled: bool) -> Self {
135 self.failover_enabled = enabled;
136 self
137 }
138}
139
140#[derive(Clone, Debug, Default)]
142pub struct TlsConfig {
143 pub enabled: bool,
145 pub ca_cert_path: Option<String>,
147 pub client_cert_path: Option<String>,
149 pub client_key_path: Option<String>,
151 pub skip_verify: bool,
153}
154
155impl TlsConfig {
156 pub fn new() -> Self {
158 Self {
159 enabled: true,
160 ..Default::default()
161 }
162 }
163
164 pub fn with_ca_cert(mut self, path: impl Into<String>) -> Self {
166 self.ca_cert_path = Some(path.into());
167 self
168 }
169
170 pub fn with_client_cert(
172 mut self,
173 cert_path: impl Into<String>,
174 key_path: impl Into<String>,
175 ) -> Self {
176 self.client_cert_path = Some(cert_path.into());
177 self.client_key_path = Some(key_path.into());
178 self
179 }
180
181 pub fn with_skip_verify(mut self, skip: bool) -> Self {
183 self.skip_verify = skip;
184 self
185 }
186}
187
188#[derive(Clone, Debug)]
190pub struct ClientConfig {
191 pub server_addrs: Vec<String>,
193 pub namespace: String,
195 pub app_name: String,
197 pub labels: HashMap<String, String>,
199 pub timeout_ms: u64,
201 pub retry_times: u32,
203 pub credentials: Credentials,
205 pub tls: TlsConfig,
207 pub cache: CacheConfig,
209}
210
211impl Default for ClientConfig {
212 fn default() -> Self {
213 Self {
214 server_addrs: vec!["localhost:8848".to_string()],
215 namespace: DEFAULT_NAMESPACE.to_string(),
216 app_name: String::new(),
217 labels: HashMap::new(),
218 timeout_ms: DEFAULT_TIMEOUT_MS,
219 retry_times: 3,
220 credentials: Credentials::default(),
221 tls: TlsConfig::default(),
222 cache: CacheConfig::default(),
223 }
224 }
225}
226
227pub struct BatataClientBuilder {
229 config: ClientConfig,
230}
231
232impl BatataClientBuilder {
233 pub fn new() -> Self {
235 Self {
236 config: ClientConfig::default(),
237 }
238 }
239
240 pub fn server_addr(mut self, addr: &str) -> Self {
242 self.config.server_addrs = vec![addr.to_string()];
243 self
244 }
245
246 pub fn server_addrs(mut self, addrs: Vec<String>) -> Self {
248 self.config.server_addrs = addrs;
249 self
250 }
251
252 pub fn namespace(mut self, namespace: &str) -> Self {
254 self.config.namespace = namespace.to_string();
255 self
256 }
257
258 pub fn app_name(mut self, app_name: &str) -> Self {
260 self.config.app_name = app_name.to_string();
261 self
262 }
263
264 pub fn label(mut self, key: &str, value: &str) -> Self {
266 self.config.labels.insert(key.to_string(), value.to_string());
267 self
268 }
269
270 pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
272 self.config.labels = labels;
273 self
274 }
275
276 pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
278 self.config.timeout_ms = timeout_ms;
279 self
280 }
281
282 pub fn retry_times(mut self, retry_times: u32) -> Self {
284 self.config.retry_times = retry_times;
285 self
286 }
287
288 pub fn username_password(mut self, username: &str, password: &str) -> Self {
290 self.config.credentials = Credentials::with_username_password(username, password);
291 self
292 }
293
294 pub fn access_key(mut self, access_key: &str, secret_key: &str) -> Self {
296 self.config.credentials = Credentials::with_access_key(access_key, secret_key);
297 self
298 }
299
300 pub fn credentials(mut self, credentials: Credentials) -> Self {
302 self.config.credentials = credentials;
303 self
304 }
305
306 pub fn acm(
308 mut self,
309 access_key: &str,
310 secret_key: &str,
311 endpoint: &str,
312 region_id: &str,
313 ) -> Self {
314 self.config.credentials =
315 Credentials::with_acm(access_key, secret_key, endpoint, region_id);
316 self
317 }
318
319 pub fn acm_endpoint(mut self, endpoint: &str) -> Self {
321 self.config.credentials.set_endpoint(endpoint);
322 self
323 }
324
325 pub fn acm_region_id(mut self, region_id: &str) -> Self {
327 self.config.credentials.set_region_id(region_id);
328 self
329 }
330
331 pub fn tls(mut self, enabled: bool) -> Self {
333 self.config.tls.enabled = enabled;
334 self
335 }
336
337 pub fn tls_config(mut self, tls: TlsConfig) -> Self {
339 self.config.tls = tls;
340 self
341 }
342
343 pub fn cache_dir(mut self, dir: &str) -> Self {
345 self.config.cache.cache_dir = Some(dir.to_string());
346 self
347 }
348
349 pub fn cache_config(mut self, cache: CacheConfig) -> Self {
351 self.config.cache = cache;
352 self
353 }
354
355 pub fn not_load_cache_at_start(mut self, enabled: bool) -> Self {
357 self.config.cache.not_load_cache_at_start = enabled;
358 self
359 }
360
361 pub fn update_cache_when_empty(mut self, enabled: bool) -> Self {
363 self.config.cache.update_cache_when_empty = enabled;
364 self
365 }
366
367 pub fn failover_enabled(mut self, enabled: bool) -> Self {
369 self.config.cache.failover_enabled = enabled;
370 self
371 }
372
373 pub async fn build(self) -> Result<BatataClient> {
375 BatataClient::new(self.config).await
376 }
377}
378
379impl Default for BatataClientBuilder {
380 fn default() -> Self {
381 Self::new()
382 }
383}
384
385pub struct BatataClient {
387 #[allow(dead_code)]
388 config: ClientConfig,
389 rpc_client: Arc<RpcClient>,
390 config_service: Arc<ConfigService>,
391 naming_service: Arc<NamingService>,
392 started: Arc<RwLock<bool>>,
393}
394
395impl BatataClient {
396 pub fn builder() -> BatataClientBuilder {
398 BatataClientBuilder::new()
399 }
400
401 pub async fn new(config: ClientConfig) -> Result<Self> {
403 let rpc_client = RpcClient::new(config.server_addrs.clone())?
404 .with_namespace(&config.namespace)
405 .with_app_name(&config.app_name)
406 .with_labels(config.labels.clone())
407 .with_timeout(config.timeout_ms)
408 .with_retry(config.retry_times);
409
410 rpc_client.start().await?;
412
413 let rpc_client = Arc::new(rpc_client);
414
415 let config_service = Arc::new(ConfigService::new(
417 rpc_client.clone(),
418 &config.namespace,
419 config.cache.clone(),
420 ));
421
422 let naming_service = Arc::new(NamingService::new(
424 rpc_client.clone(),
425 &config.namespace,
426 config.cache.clone(),
427 ));
428
429 let client = Self {
430 config,
431 rpc_client,
432 config_service,
433 naming_service,
434 started: Arc::new(RwLock::new(true)),
435 };
436
437 info!("BatataClient created and connected");
438
439 Ok(client)
440 }
441
442 pub fn config_service(&self) -> Arc<ConfigService> {
444 self.config_service.clone()
445 }
446
447 pub fn naming_service(&self) -> Arc<NamingService> {
449 self.naming_service.clone()
450 }
451
452 pub fn rpc_client(&self) -> Arc<RpcClient> {
454 self.rpc_client.clone()
455 }
456
457 pub fn is_connected(&self) -> bool {
459 self.rpc_client.is_connected()
460 }
461
462 pub fn connection_id(&self) -> Option<String> {
464 self.rpc_client.connection_id()
465 }
466
467 pub async fn shutdown(&self) {
469 if !*self.started.read() {
470 return;
471 }
472
473 *self.started.write() = false;
474
475 self.config_service.stop().await;
477 self.naming_service.stop().await;
478 self.rpc_client.stop().await;
479
480 info!("BatataClient shutdown");
481 }
482
483 pub async fn start_config_service(&self) -> Result<()> {
485 self.config_service.start().await
486 }
487
488 pub async fn start_naming_service(&self) -> Result<()> {
490 self.naming_service.start().await
491 }
492}
493
494impl Drop for BatataClient {
495 fn drop(&mut self) {
496 }
498}
499
500pub mod prelude {
502 pub use crate::{
503 BatataClient, BatataClientBuilder, BatataError, CacheConfig, ClientConfig,
504 ConfigChangeEvent, ConfigListener, ConfigService, Instance, NamingService, Result,
505 Service, ServiceChangeEvent, ServiceListener,
506 };
507}