batata_client/
lib.rs

1//! Batata Client - Rust client for Batata/Nacos service discovery and configuration management
2//!
3//! # Overview
4//!
5//! Batata Client provides a Rust SDK for interacting with Batata/Nacos servers, supporting:
6//! - Configuration management (get, publish, remove, listen)
7//! - Service discovery (register, deregister, query, subscribe)
8//!
9//! # Quick Start
10//!
11//! ```rust,no_run
12//! use batata_client::{BatataClient, ClientConfig};
13//!
14//! #[tokio::main]
15//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
16//!     // Create client
17//!     let client = BatataClient::builder()
18//!         .server_addr("localhost:8848")
19//!         .namespace("public")
20//!         .build()
21//!         .await?;
22//!
23//!     // Get configuration
24//!     let config_service = client.config_service();
25//!     let content = config_service.get_config("my-config", "DEFAULT_GROUP").await?;
26//!     println!("Config: {}", content);
27//!
28//!     // Register service
29//!     let naming_service = client.naming_service();
30//!     naming_service.register_instance_simple("my-service", "127.0.0.1", 8080).await?;
31//!
32//!     // Shutdown
33//!     client.shutdown().await;
34//!
35//!     Ok(())
36//! }
37//! ```
38
39pub mod api;
40pub mod auth;
41pub mod cache;
42pub mod common;
43pub mod config;
44pub mod crypto;
45pub mod error;
46pub mod logging;
47pub mod naming;
48pub mod remote;
49
50#[cfg(test)]
51mod tests;
52
53pub use api::config::{
54    ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigChangeNotifyRequest,
55    ConfigInfo, ConfigListenContext, ConfigPublishRequest, ConfigPublishResponse,
56    ConfigQueryRequest, ConfigQueryResponse, ConfigRemoveRequest, ConfigRemoveResponse,
57    ConfigSearchRequest, ConfigSearchResponse, ConfigSearchItem,
58};
59pub use api::naming::{
60    BatchInstanceRequest, BatchInstanceResponse, Instance, InstanceRequest, InstanceResponse,
61    QueryServiceResponse, Service, ServiceListRequest, ServiceListResponse, ServiceQueryRequest,
62    SubscribeServiceRequest, SubscribeServiceResponse,
63};
64pub use api::remote::{RequestTrait, ResponseTrait};
65pub use auth::{AccessToken, AuthManager, Credentials};
66pub use cache::FileCache;
67pub use common::constants::*;
68pub use config::{ConfigChangeEvent, ConfigChangeType, ConfigListener, ConfigService};
69pub use error::{BatataError, Result};
70pub use naming::{
71    LoadBalancer, NamingService, RandomBalancer, ServiceChangeEvent, ServiceListener,
72    WeightedRoundRobinBalancer,
73};
74pub use remote::RpcClient;
75
76use std::collections::HashMap;
77use std::sync::Arc;
78
79use parking_lot::RwLock;
80use tracing::info;
81
82/// Cache configuration
83#[derive(Clone, Debug)]
84pub struct CacheConfig {
85    /// Cache directory for local failover cache
86    pub cache_dir: Option<String>,
87    /// Do not load cache at startup
88    pub not_load_cache_at_start: bool,
89    /// Update in-memory cache when empty (failover mode)
90    pub update_cache_when_empty: bool,
91    /// Enable file-based failover cache
92    pub failover_enabled: bool,
93}
94
95impl Default for CacheConfig {
96    fn default() -> Self {
97        Self {
98            cache_dir: None,
99            not_load_cache_at_start: false,
100            update_cache_when_empty: true,
101            failover_enabled: true,
102        }
103    }
104}
105
106impl CacheConfig {
107    /// Create a new cache config with directory
108    pub fn new(cache_dir: impl Into<String>) -> Self {
109        Self {
110            cache_dir: Some(cache_dir.into()),
111            ..Default::default()
112        }
113    }
114
115    /// Set cache directory
116    pub fn with_cache_dir(mut self, dir: impl Into<String>) -> Self {
117        self.cache_dir = Some(dir.into());
118        self
119    }
120
121    /// Set not load cache at start
122    pub fn with_not_load_cache_at_start(mut self, enabled: bool) -> Self {
123        self.not_load_cache_at_start = enabled;
124        self
125    }
126
127    /// Set update cache when empty
128    pub fn with_update_cache_when_empty(mut self, enabled: bool) -> Self {
129        self.update_cache_when_empty = enabled;
130        self
131    }
132
133    /// Set failover enabled
134    pub fn with_failover_enabled(mut self, enabled: bool) -> Self {
135        self.failover_enabled = enabled;
136        self
137    }
138}
139
140/// TLS configuration
141#[derive(Clone, Debug, Default)]
142pub struct TlsConfig {
143    /// Enable TLS
144    pub enabled: bool,
145    /// Path to CA certificate file (PEM format)
146    pub ca_cert_path: Option<String>,
147    /// Path to client certificate file (PEM format)
148    pub client_cert_path: Option<String>,
149    /// Path to client key file (PEM format)
150    pub client_key_path: Option<String>,
151    /// Skip server certificate verification (not recommended for production)
152    pub skip_verify: bool,
153}
154
155impl TlsConfig {
156    /// Create a new TLS config with TLS enabled
157    pub fn new() -> Self {
158        Self {
159            enabled: true,
160            ..Default::default()
161        }
162    }
163
164    /// Set CA certificate path
165    pub fn with_ca_cert(mut self, path: impl Into<String>) -> Self {
166        self.ca_cert_path = Some(path.into());
167        self
168    }
169
170    /// Set client certificate and key paths
171    pub fn with_client_cert(
172        mut self,
173        cert_path: impl Into<String>,
174        key_path: impl Into<String>,
175    ) -> Self {
176        self.client_cert_path = Some(cert_path.into());
177        self.client_key_path = Some(key_path.into());
178        self
179    }
180
181    /// Skip server certificate verification
182    pub fn with_skip_verify(mut self, skip: bool) -> Self {
183        self.skip_verify = skip;
184        self
185    }
186}
187
188/// Client configuration
189#[derive(Clone, Debug)]
190pub struct ClientConfig {
191    /// Server addresses (host:port)
192    pub server_addrs: Vec<String>,
193    /// Namespace (default: public)
194    pub namespace: String,
195    /// Application name
196    pub app_name: String,
197    /// Custom labels
198    pub labels: HashMap<String, String>,
199    /// Request timeout in milliseconds
200    pub timeout_ms: u64,
201    /// Retry times on failure
202    pub retry_times: u32,
203    /// Authentication credentials
204    pub credentials: Credentials,
205    /// TLS configuration
206    pub tls: TlsConfig,
207    /// Cache configuration
208    pub cache: CacheConfig,
209}
210
211impl Default for ClientConfig {
212    fn default() -> Self {
213        Self {
214            server_addrs: vec!["localhost:8848".to_string()],
215            namespace: DEFAULT_NAMESPACE.to_string(),
216            app_name: String::new(),
217            labels: HashMap::new(),
218            timeout_ms: DEFAULT_TIMEOUT_MS,
219            retry_times: 3,
220            credentials: Credentials::default(),
221            tls: TlsConfig::default(),
222            cache: CacheConfig::default(),
223        }
224    }
225}
226
227/// Builder for BatataClient
228pub struct BatataClientBuilder {
229    config: ClientConfig,
230}
231
232impl BatataClientBuilder {
233    /// Create a new builder with default configuration
234    pub fn new() -> Self {
235        Self {
236            config: ClientConfig::default(),
237        }
238    }
239
240    /// Set server address (single server)
241    pub fn server_addr(mut self, addr: &str) -> Self {
242        self.config.server_addrs = vec![addr.to_string()];
243        self
244    }
245
246    /// Set server addresses (multiple servers)
247    pub fn server_addrs(mut self, addrs: Vec<String>) -> Self {
248        self.config.server_addrs = addrs;
249        self
250    }
251
252    /// Set namespace
253    pub fn namespace(mut self, namespace: &str) -> Self {
254        self.config.namespace = namespace.to_string();
255        self
256    }
257
258    /// Set application name
259    pub fn app_name(mut self, app_name: &str) -> Self {
260        self.config.app_name = app_name.to_string();
261        self
262    }
263
264    /// Add a custom label
265    pub fn label(mut self, key: &str, value: &str) -> Self {
266        self.config.labels.insert(key.to_string(), value.to_string());
267        self
268    }
269
270    /// Set labels
271    pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
272        self.config.labels = labels;
273        self
274    }
275
276    /// Set timeout in milliseconds
277    pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
278        self.config.timeout_ms = timeout_ms;
279        self
280    }
281
282    /// Set retry times
283    pub fn retry_times(mut self, retry_times: u32) -> Self {
284        self.config.retry_times = retry_times;
285        self
286    }
287
288    /// Set username and password for authentication
289    pub fn username_password(mut self, username: &str, password: &str) -> Self {
290        self.config.credentials = Credentials::with_username_password(username, password);
291        self
292    }
293
294    /// Set access key and secret key for authentication
295    pub fn access_key(mut self, access_key: &str, secret_key: &str) -> Self {
296        self.config.credentials = Credentials::with_access_key(access_key, secret_key);
297        self
298    }
299
300    /// Set credentials
301    pub fn credentials(mut self, credentials: Credentials) -> Self {
302        self.config.credentials = credentials;
303        self
304    }
305
306    /// Configure for Alibaba Cloud ACM
307    pub fn acm(
308        mut self,
309        access_key: &str,
310        secret_key: &str,
311        endpoint: &str,
312        region_id: &str,
313    ) -> Self {
314        self.config.credentials =
315            Credentials::with_acm(access_key, secret_key, endpoint, region_id);
316        self
317    }
318
319    /// Set ACM endpoint
320    pub fn acm_endpoint(mut self, endpoint: &str) -> Self {
321        self.config.credentials.set_endpoint(endpoint);
322        self
323    }
324
325    /// Set ACM region ID
326    pub fn acm_region_id(mut self, region_id: &str) -> Self {
327        self.config.credentials.set_region_id(region_id);
328        self
329    }
330
331    /// Enable TLS with default settings
332    pub fn tls(mut self, enabled: bool) -> Self {
333        self.config.tls.enabled = enabled;
334        self
335    }
336
337    /// Set TLS configuration
338    pub fn tls_config(mut self, tls: TlsConfig) -> Self {
339        self.config.tls = tls;
340        self
341    }
342
343    /// Set cache directory for local failover cache
344    pub fn cache_dir(mut self, dir: &str) -> Self {
345        self.config.cache.cache_dir = Some(dir.to_string());
346        self
347    }
348
349    /// Set cache configuration
350    pub fn cache_config(mut self, cache: CacheConfig) -> Self {
351        self.config.cache = cache;
352        self
353    }
354
355    /// Set not load cache at startup
356    pub fn not_load_cache_at_start(mut self, enabled: bool) -> Self {
357        self.config.cache.not_load_cache_at_start = enabled;
358        self
359    }
360
361    /// Set update cache when empty (failover mode)
362    pub fn update_cache_when_empty(mut self, enabled: bool) -> Self {
363        self.config.cache.update_cache_when_empty = enabled;
364        self
365    }
366
367    /// Enable/disable failover cache
368    pub fn failover_enabled(mut self, enabled: bool) -> Self {
369        self.config.cache.failover_enabled = enabled;
370        self
371    }
372
373    /// Build the client
374    pub async fn build(self) -> Result<BatataClient> {
375        BatataClient::new(self.config).await
376    }
377}
378
379impl Default for BatataClientBuilder {
380    fn default() -> Self {
381        Self::new()
382    }
383}
384
385/// Main Batata client
386pub struct BatataClient {
387    #[allow(dead_code)]
388    config: ClientConfig,
389    rpc_client: Arc<RpcClient>,
390    config_service: Arc<ConfigService>,
391    naming_service: Arc<NamingService>,
392    started: Arc<RwLock<bool>>,
393}
394
395impl BatataClient {
396    /// Create a new client builder
397    pub fn builder() -> BatataClientBuilder {
398        BatataClientBuilder::new()
399    }
400
401    /// Create a new client with configuration
402    pub async fn new(config: ClientConfig) -> Result<Self> {
403        let rpc_client = RpcClient::new(config.server_addrs.clone())?
404            .with_namespace(&config.namespace)
405            .with_app_name(&config.app_name)
406            .with_labels(config.labels.clone())
407            .with_timeout(config.timeout_ms)
408            .with_retry(config.retry_times);
409
410        // Start RPC client
411        rpc_client.start().await?;
412
413        let rpc_client = Arc::new(rpc_client);
414
415        // Create config service with cache config
416        let config_service = Arc::new(ConfigService::new(
417            rpc_client.clone(),
418            &config.namespace,
419            config.cache.clone(),
420        ));
421
422        // Create naming service with cache config
423        let naming_service = Arc::new(NamingService::new(
424            rpc_client.clone(),
425            &config.namespace,
426            config.cache.clone(),
427        ));
428
429        let client = Self {
430            config,
431            rpc_client,
432            config_service,
433            naming_service,
434            started: Arc::new(RwLock::new(true)),
435        };
436
437        info!("BatataClient created and connected");
438
439        Ok(client)
440    }
441
442    /// Get configuration service
443    pub fn config_service(&self) -> Arc<ConfigService> {
444        self.config_service.clone()
445    }
446
447    /// Get naming service
448    pub fn naming_service(&self) -> Arc<NamingService> {
449        self.naming_service.clone()
450    }
451
452    /// Get RPC client
453    pub fn rpc_client(&self) -> Arc<RpcClient> {
454        self.rpc_client.clone()
455    }
456
457    /// Check if client is connected
458    pub fn is_connected(&self) -> bool {
459        self.rpc_client.is_connected()
460    }
461
462    /// Get connection ID
463    pub fn connection_id(&self) -> Option<String> {
464        self.rpc_client.connection_id()
465    }
466
467    /// Shutdown the client
468    pub async fn shutdown(&self) {
469        if !*self.started.read() {
470            return;
471        }
472
473        *self.started.write() = false;
474
475        // Stop services
476        self.config_service.stop().await;
477        self.naming_service.stop().await;
478        self.rpc_client.stop().await;
479
480        info!("BatataClient shutdown");
481    }
482
483    /// Start config service (for listening)
484    pub async fn start_config_service(&self) -> Result<()> {
485        self.config_service.start().await
486    }
487
488    /// Start naming service (for heartbeat)
489    pub async fn start_naming_service(&self) -> Result<()> {
490        self.naming_service.start().await
491    }
492}
493
494impl Drop for BatataClient {
495    fn drop(&mut self) {
496        // Note: async drop is not possible, so cleanup should be done via shutdown()
497    }
498}
499
500// Re-export common types for convenience
501pub mod prelude {
502    pub use crate::{
503        BatataClient, BatataClientBuilder, BatataError, CacheConfig, ClientConfig,
504        ConfigChangeEvent, ConfigListener, ConfigService, Instance, NamingService, Result,
505        Service, ServiceChangeEvent, ServiceListener,
506    };
507}