batata_client/
lib.rs

1//! Batata Client - Rust client for Batata/Nacos service discovery and configuration management
2//!
3//! # Overview
4//!
5//! Batata Client provides a Rust SDK for interacting with Batata/Nacos servers, supporting:
6//! - Configuration management (get, publish, remove, listen)
7//! - Service discovery (register, deregister, query, subscribe)
8//!
9//! # Quick Start
10//!
11//! ```rust,no_run
12//! use batata_client::{BatataClient, ClientConfig};
13//!
14//! #[tokio::main]
15//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
16//!     // Create client
17//!     let client = BatataClient::builder()
18//!         .server_addr("localhost:8848")
19//!         .namespace("public")
20//!         .build()
21//!         .await?;
22//!
23//!     // Get configuration
24//!     let config_service = client.config_service();
25//!     let content = config_service.get_config("my-config", "DEFAULT_GROUP").await?;
26//!     println!("Config: {}", content);
27//!
28//!     // Register service
29//!     let naming_service = client.naming_service();
30//!     naming_service.register_instance_simple("my-service", "127.0.0.1", 8080).await?;
31//!
32//!     // Shutdown
33//!     client.shutdown().await;
34//!
35//!     Ok(())
36//! }
37//! ```
38
39pub mod api;
40pub mod auth;
41pub mod cache;
42pub mod common;
43pub mod config;
44pub mod error;
45pub mod naming;
46pub mod remote;
47
48#[cfg(test)]
49mod tests;
50
51pub use api::config::{
52    ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigChangeNotifyRequest,
53    ConfigInfo, ConfigListenContext, ConfigPublishRequest, ConfigPublishResponse,
54    ConfigQueryRequest, ConfigQueryResponse, ConfigRemoveRequest, ConfigRemoveResponse,
55    ConfigSearchRequest, ConfigSearchResponse, ConfigSearchItem,
56};
57pub use api::naming::{
58    BatchInstanceRequest, BatchInstanceResponse, Instance, InstanceRequest, InstanceResponse,
59    QueryServiceResponse, Service, ServiceListRequest, ServiceListResponse, ServiceQueryRequest,
60    SubscribeServiceRequest, SubscribeServiceResponse,
61};
62pub use api::remote::{RequestTrait, ResponseTrait};
63pub use auth::{AccessToken, AuthManager, Credentials};
64pub use cache::FileCache;
65pub use common::constants::*;
66pub use config::{ConfigChangeEvent, ConfigChangeType, ConfigListener, ConfigService};
67pub use error::{BatataError, Result};
68pub use naming::{NamingService, ServiceChangeEvent, ServiceListener};
69pub use remote::RpcClient;
70
71use std::collections::HashMap;
72use std::sync::Arc;
73
74use parking_lot::RwLock;
75use tracing::info;
76
77/// TLS configuration
78#[derive(Clone, Debug, Default)]
79pub struct TlsConfig {
80    /// Enable TLS
81    pub enabled: bool,
82    /// Path to CA certificate file (PEM format)
83    pub ca_cert_path: Option<String>,
84    /// Path to client certificate file (PEM format)
85    pub client_cert_path: Option<String>,
86    /// Path to client key file (PEM format)
87    pub client_key_path: Option<String>,
88    /// Skip server certificate verification (not recommended for production)
89    pub skip_verify: bool,
90}
91
92impl TlsConfig {
93    /// Create a new TLS config with TLS enabled
94    pub fn new() -> Self {
95        Self {
96            enabled: true,
97            ..Default::default()
98        }
99    }
100
101    /// Set CA certificate path
102    pub fn with_ca_cert(mut self, path: impl Into<String>) -> Self {
103        self.ca_cert_path = Some(path.into());
104        self
105    }
106
107    /// Set client certificate and key paths
108    pub fn with_client_cert(
109        mut self,
110        cert_path: impl Into<String>,
111        key_path: impl Into<String>,
112    ) -> Self {
113        self.client_cert_path = Some(cert_path.into());
114        self.client_key_path = Some(key_path.into());
115        self
116    }
117
118    /// Skip server certificate verification
119    pub fn with_skip_verify(mut self, skip: bool) -> Self {
120        self.skip_verify = skip;
121        self
122    }
123}
124
125/// Client configuration
126#[derive(Clone, Debug)]
127pub struct ClientConfig {
128    /// Server addresses (host:port)
129    pub server_addrs: Vec<String>,
130    /// Namespace (default: public)
131    pub namespace: String,
132    /// Application name
133    pub app_name: String,
134    /// Custom labels
135    pub labels: HashMap<String, String>,
136    /// Request timeout in milliseconds
137    pub timeout_ms: u64,
138    /// Retry times on failure
139    pub retry_times: u32,
140    /// Authentication credentials
141    pub credentials: Credentials,
142    /// TLS configuration
143    pub tls: TlsConfig,
144    /// Cache directory for local failover cache
145    pub cache_dir: Option<String>,
146}
147
148impl Default for ClientConfig {
149    fn default() -> Self {
150        Self {
151            server_addrs: vec!["localhost:8848".to_string()],
152            namespace: DEFAULT_NAMESPACE.to_string(),
153            app_name: String::new(),
154            labels: HashMap::new(),
155            timeout_ms: DEFAULT_TIMEOUT_MS,
156            retry_times: 3,
157            credentials: Credentials::default(),
158            tls: TlsConfig::default(),
159            cache_dir: None,
160        }
161    }
162}
163
164/// Builder for BatataClient
165pub struct BatataClientBuilder {
166    config: ClientConfig,
167}
168
169impl BatataClientBuilder {
170    /// Create a new builder with default configuration
171    pub fn new() -> Self {
172        Self {
173            config: ClientConfig::default(),
174        }
175    }
176
177    /// Set server address (single server)
178    pub fn server_addr(mut self, addr: &str) -> Self {
179        self.config.server_addrs = vec![addr.to_string()];
180        self
181    }
182
183    /// Set server addresses (multiple servers)
184    pub fn server_addrs(mut self, addrs: Vec<String>) -> Self {
185        self.config.server_addrs = addrs;
186        self
187    }
188
189    /// Set namespace
190    pub fn namespace(mut self, namespace: &str) -> Self {
191        self.config.namespace = namespace.to_string();
192        self
193    }
194
195    /// Set application name
196    pub fn app_name(mut self, app_name: &str) -> Self {
197        self.config.app_name = app_name.to_string();
198        self
199    }
200
201    /// Add a custom label
202    pub fn label(mut self, key: &str, value: &str) -> Self {
203        self.config.labels.insert(key.to_string(), value.to_string());
204        self
205    }
206
207    /// Set labels
208    pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
209        self.config.labels = labels;
210        self
211    }
212
213    /// Set timeout in milliseconds
214    pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
215        self.config.timeout_ms = timeout_ms;
216        self
217    }
218
219    /// Set retry times
220    pub fn retry_times(mut self, retry_times: u32) -> Self {
221        self.config.retry_times = retry_times;
222        self
223    }
224
225    /// Set username and password for authentication
226    pub fn username_password(mut self, username: &str, password: &str) -> Self {
227        self.config.credentials = Credentials::with_username_password(username, password);
228        self
229    }
230
231    /// Set access key and secret key for authentication
232    pub fn access_key(mut self, access_key: &str, secret_key: &str) -> Self {
233        self.config.credentials = Credentials::with_access_key(access_key, secret_key);
234        self
235    }
236
237    /// Set credentials
238    pub fn credentials(mut self, credentials: Credentials) -> Self {
239        self.config.credentials = credentials;
240        self
241    }
242
243    /// Enable TLS with default settings
244    pub fn tls(mut self, enabled: bool) -> Self {
245        self.config.tls.enabled = enabled;
246        self
247    }
248
249    /// Set TLS configuration
250    pub fn tls_config(mut self, tls: TlsConfig) -> Self {
251        self.config.tls = tls;
252        self
253    }
254
255    /// Set cache directory for local failover cache
256    pub fn cache_dir(mut self, dir: &str) -> Self {
257        self.config.cache_dir = Some(dir.to_string());
258        self
259    }
260
261    /// Build the client
262    pub async fn build(self) -> Result<BatataClient> {
263        BatataClient::new(self.config).await
264    }
265}
266
267impl Default for BatataClientBuilder {
268    fn default() -> Self {
269        Self::new()
270    }
271}
272
273/// Main Batata client
274pub struct BatataClient {
275    #[allow(dead_code)]
276    config: ClientConfig,
277    rpc_client: Arc<RpcClient>,
278    config_service: Arc<ConfigService>,
279    naming_service: Arc<NamingService>,
280    started: Arc<RwLock<bool>>,
281}
282
283impl BatataClient {
284    /// Create a new client builder
285    pub fn builder() -> BatataClientBuilder {
286        BatataClientBuilder::new()
287    }
288
289    /// Create a new client with configuration
290    pub async fn new(config: ClientConfig) -> Result<Self> {
291        let rpc_client = RpcClient::new(config.server_addrs.clone())?
292            .with_namespace(&config.namespace)
293            .with_app_name(&config.app_name)
294            .with_labels(config.labels.clone())
295            .with_timeout(config.timeout_ms)
296            .with_retry(config.retry_times);
297
298        // Start RPC client
299        rpc_client.start().await?;
300
301        let rpc_client = Arc::new(rpc_client);
302
303        // Create config service
304        let config_service = Arc::new(ConfigService::new(rpc_client.clone(), &config.namespace));
305
306        // Create naming service
307        let naming_service = Arc::new(NamingService::new(rpc_client.clone(), &config.namespace));
308
309        let client = Self {
310            config,
311            rpc_client,
312            config_service,
313            naming_service,
314            started: Arc::new(RwLock::new(true)),
315        };
316
317        info!("BatataClient created and connected");
318
319        Ok(client)
320    }
321
322    /// Get configuration service
323    pub fn config_service(&self) -> Arc<ConfigService> {
324        self.config_service.clone()
325    }
326
327    /// Get naming service
328    pub fn naming_service(&self) -> Arc<NamingService> {
329        self.naming_service.clone()
330    }
331
332    /// Get RPC client
333    pub fn rpc_client(&self) -> Arc<RpcClient> {
334        self.rpc_client.clone()
335    }
336
337    /// Check if client is connected
338    pub fn is_connected(&self) -> bool {
339        self.rpc_client.is_connected()
340    }
341
342    /// Get connection ID
343    pub fn connection_id(&self) -> Option<String> {
344        self.rpc_client.connection_id()
345    }
346
347    /// Shutdown the client
348    pub async fn shutdown(&self) {
349        if !*self.started.read() {
350            return;
351        }
352
353        *self.started.write() = false;
354
355        // Stop services
356        self.config_service.stop().await;
357        self.naming_service.stop().await;
358        self.rpc_client.stop().await;
359
360        info!("BatataClient shutdown");
361    }
362
363    /// Start config service (for listening)
364    pub async fn start_config_service(&self) -> Result<()> {
365        self.config_service.start().await
366    }
367
368    /// Start naming service (for heartbeat)
369    pub async fn start_naming_service(&self) -> Result<()> {
370        self.naming_service.start().await
371    }
372}
373
374impl Drop for BatataClient {
375    fn drop(&mut self) {
376        // Note: async drop is not possible, so cleanup should be done via shutdown()
377    }
378}
379
380// Re-export common types for convenience
381pub mod prelude {
382    pub use crate::{
383        BatataClient, BatataClientBuilder, BatataError, ClientConfig, ConfigChangeEvent,
384        ConfigListener, ConfigService, Instance, NamingService, Result, Service,
385        ServiceChangeEvent, ServiceListener,
386    };
387}