conreg_client/lib.rs
1//! # Conreg Client
2//!
3//! Conreg is a distributed service registry and configuration center designed with reference to Nacos. See details: [conreg](https://github.com/xgpxg/conreg)
4//!
5//! conreg-client is the client SDK for conreg, used to integrate into your services and communicate with conreg-server.
6//!
7//! # Features
8//!
9//! - Configuration Center: Load and manage configurations from conreg-server
10//! - Service Discovery: Register and discover service instances
11//! - Load Balancing: Multiple load balancing strategies (Random, Round-Robin, Weighted, etc.)
12//! - Declarative HTTP Client: Feign-like declarative microservice calling (requires `feign` feature)
13//!
14//! # Quick Start
15//!
16//! ## Basic Usage
17//!
18//! Add a `bootstrap.yaml` configuration file in your project's root directory:
19//!
20//! ```yaml
21//! conreg:
22//! # Service ID is the unique identifier of the service. Service IDs in the same namespace cannot be duplicated.
23//! service-id: test
24//! # Client configuration, this information will be submitted to the registry as basic information of the service instance
25//! client:
26//! # Listening address
27//! address: 127.0.0.1
28//! # Port
29//! port: 8000
30//! # Configuration center configuration
31//! config:
32//! # Configuration center address
33//! server-addr: 127.0.0.1:8000
34//! # Configuration ID
35//! # If there are duplicate configuration keys in multiple configurations, the latter configuration will overwrite the previous one
36//! config-ids:
37//! - test.yaml
38//! auth-token: your_token
39//! # Registry configuration
40//! discovery:
41//! # Registry address
42//! server-addr:
43//! - 127.0.0.1:8000
44//! - 127.0.0.1:8001
45//! - 127.0.0.1:8002
46//! auth-token: your_token
47//! ```
48//!
49//! Then, initialize in the `main` function:
50//!
51//! ```rust
52//! #[tokio::main]
53//! async fn main() {
54//! // Initialization
55//! init().await;
56//! // Get configuration item
57//! println!("{:?}", AppConfig::get::<String>("name"));
58//! // Get service instances
59//! let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
60//! println!("service instances: {:?}", instances);
61//! }
62//! ```
63//!
64//! ## Namespace
65//!
66//! Conreg uses namespaces to isolate configurations and services. The default namespace is `public`.
67//!
68//! ## Configuration Center
69//!
70//! Load and use configurations from the configuration center. Currently only `yaml` format configurations are supported.
71//!
72//! ### Initialize and Load Configuration
73//!
74//! ```rust
75//! #[tokio::main]
76//! async fn main() {
77//! init_with(
78//! ConRegConfigBuilder::default()
79//! .config(
80//! ConfigConfigBuilder::default()
81//! .server_addr("127.0.0.1:8000")
82//! .namespace("public")
83//! .config_ids(vec!["test.yaml".into()])
84//! .build()
85//! .unwrap(),
86//! )
87//! .build()
88//! .unwrap(),
89//! )
90//! .await;
91//! println!("{:?}", AppConfig::get::<String>("name"));
92//! println!("{:?}", AppConfig::get::<u32>("age"));
93//! }
94//! ```
95//!
96//! ### Initialize from Configuration File
97//!
98//! By default, conreg-client loads configurations from the bootstrap.yaml file in the project root directory to initialize configurations, just like SpringCloud.
99//! The following is an example of `bootstrap.yaml` configuration:
100//!
101//! ```yaml
102//! conreg:
103//! config:
104//! server-addr: 127.0.0.1:8000
105//! config-ids:
106//! - your_config.yaml
107//! ```
108//!
109//! Then call the `init` method to initialize and get the configuration content.
110//!
111//! ```rust
112//! #[tokio::main]
113//! async fn main() {
114//! init().await;
115//! // Or specify the configuration file path
116//! // init_from_file("config.yaml").await;
117//! println!("{:?}", AppConfig::get::<String>("name"));
118//! println!("{:?}", AppConfig::get::<u32>("age"));
119//! }
120//! ```
121//!
122//! ## Registry Center
123//!
124//! Used for service registration and discovery.
125//!
126//! ### Initialize and Load Configuration
127//!
128//! ```rust
129//! #[tokio::main]
130//! async fn main() {
131//! let config = ConRegConfigBuilder::default()
132//! .service_id("your_service_id")
133//! .client(
134//! ClientConfigBuilder::default()
135//! .address("127.0.0.1")
136//! .port(8080)
137//! .build()
138//! .unwrap(),
139//! )
140//! .discovery(
141//! DiscoveryConfigBuilder::default()
142//! .server_addr("127.0.0.1:8000")
143//! .build()
144//! .unwrap(),
145//! )
146//! .build()
147//! .unwrap();
148//! let service_id = config.service_id.clone();
149//! init_with(config).await;
150//! let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
151//! println!("service instances: {:?}", instances);
152//! }
153//! ```
154//!
155//! ### Initialize from Configuration File
156//!
157//! By default, configurations are loaded from `bootstrap.yaml`.
158//! The following is an example configuration:
159//!
160//! ```yaml
161//! conreg:
162//! service-id: your_service_id
163//! client:
164//! address: 127.0.0.1
165//! port: 8000
166//! discovery:
167//! server-addr:
168//! - 127.0.0.1:8000
169//! - 127.0.0.1:8001
170//! - 127.0.0.1:8002
171//! ```
172//!
173//! ```rust
174//! #[tokio::main]
175//! async fn main() {
176//! init().await;
177//! // Or specify the configuration file path
178//! // init_from_file("config.yaml").await;
179//! init_with(config).await;
180//! let service_id = "your_service_id";
181//! let instances = AppDiscovery::get_instances(service_id).await.unwrap();
182//! println!("service instances: {:?}", instances);
183//! }
184//! ```
185//!
186//! # Load Balancing
187//!
188//! conreg-client provides a load balancing client based on `reqwest`, supporting custom protocol requests in the format `lb://service_id`.
189//! Reference: [lb](https://docs.rs/conreg-client/latest/conreg_client/lb/index.html)
190//!
191//! # Listen for Configuration Changes
192//!
193//! Add a handler function for the specified config_id, which will be called when the configuration changes.
194//!
195//! ```rust
196//! AppConfig::add_listener("test.yaml", |config| {
197//! println!("Config changed, new config: {:?}", config);
198//! });
199//! ```
200//!
201//! # Feign-like Component
202//! [conreg-feign-macro](https://docs.rs/conreg-feign-macro) provides a macro that implements functionality similar to Java's Feign, enabling remote procedure calls across microservices.
203//!
204//! Example:
205//! ```rust
206//! #[feign_client(service_id = "user-service", base_path = "/api")]
207//! trait UserService{
208//! #[get("/api/users/{id}")]
209//! async fn get_user(&self, id: i32) -> Result<String, FeignError>;
210//!
211//! // ... other methods
212//! }
213//!
214//! // Then, you can use the generated client like this:
215//! let client = UserServiceImpl::default();
216//! let user = client.get_user(1).await?;
217//! ```
218
219use crate::conf::{ConRegConfig, ConRegConfigWrapper};
220use crate::config::Configs;
221use crate::discovery::{Discovery, DiscoveryClient};
222pub use crate::protocol::Instance;
223use anyhow::bail;
224use serde::de::DeserializeOwned;
225use std::collections::HashMap;
226use std::path::PathBuf;
227use std::process::exit;
228use std::sync::{Arc, OnceLock, RwLock};
229
230pub mod conf;
231mod config;
232mod discovery;
233pub mod lb;
234mod network;
235mod protocol;
236mod utils;
237
238#[cfg(feature = "feign")]
239pub use conreg_feign_macro::{delete, feign_client, get, patch, post, put};
240
241/// Feign client error types
242#[derive(Debug)]
243pub enum FeignError {
244 /// HTTP request error
245 RequestError(String),
246 /// Response deserialization error
247 DeserializationError(String),
248 /// Service instance not found
249 InstanceNotFound(String),
250 /// Load balance error
251 LoadBalanceError(String),
252}
253
254impl std::fmt::Display for FeignError {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 match self {
257 FeignError::RequestError(msg) => write!(f, "Request error: {}", msg),
258 FeignError::DeserializationError(msg) => {
259 write!(f, "Deserialization error: {}", msg)
260 }
261 FeignError::InstanceNotFound(msg) => write!(f, "Instance not found: {}", msg),
262 FeignError::LoadBalanceError(msg) => write!(f, "Load balance error: {}", msg),
263 }
264 }
265}
266
267impl std::error::Error for FeignError {}
268
269impl From<crate::lb::LoadBalanceError> for FeignError {
270 fn from(err: crate::lb::LoadBalanceError) -> Self {
271 FeignError::LoadBalanceError(err.to_string())
272 }
273}
274
275struct Conreg;
276
277/// Store configuration content
278static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
279/// Global instance for service discovery
280static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
281/// Request header for namespace authentication
282const NS_TOKEN_HEADER: &str = "X-NS-Token";
283
284impl Conreg {
285 /// Initialize configuration center and registry center
286 async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
287 #[cfg(feature = "tracing")]
288 utils::init_log();
289
290 let mut file = file.unwrap_or("bootstrap.yaml".into());
291 if !file.exists() {
292 file = "bootstrap.yml".into();
293 }
294 let s = match std::fs::read_to_string(&file) {
295 Ok(s) => s,
296 Err(e) => {
297 log::error!("no bootstrap.yaml found, {}", e);
298 exit(1);
299 }
300 };
301
302 log::info!("loaded bootstrap config from {}", file.display());
303
304 let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
305 Ok(config) => config,
306 Err(e) => {
307 log::error!("parse bootstrap.yaml failed, {}", e);
308 exit(1);
309 }
310 };
311
312 Self::init_with(&config.conreg).await?;
313
314 log::info!("conreg init completed");
315 Ok(())
316 }
317
318 async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
319 #[cfg(feature = "tracing")]
320 utils::init_log();
321
322 if config.config.is_some() {
323 let config_client = config::ConfigClient::new(config);
324 let configs = config_client.load().await?;
325 CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
326 anyhow::anyhow!(
327 "config has already been initialized, please do not initialize repeatedly"
328 )
329 })?;
330 }
331
332 if config.discovery.is_some() {
333 let discovery_client = DiscoveryClient::new(config);
334 discovery_client.register().await?;
335 let discovery = Discovery::new(discovery_client).await;
336 DISCOVERY.set(discovery).map_err(|_| {
337 anyhow::anyhow!(
338 "discovery has already been initialized, please do not initialize repeatedly"
339 )
340 })?;
341 }
342
343 Ok(())
344 }
345}
346
347/// Initialize configuration center and registry center
348pub async fn init() {
349 match Conreg::init(None).await {
350 Ok(_) => {}
351 Err(e) => {
352 log::error!("conreg init failed: {}", e);
353 exit(1);
354 }
355 };
356}
357
358/// Initialize configuration center and registry center from configuration file
359pub async fn init_from_file(path: impl Into<PathBuf>) {
360 match Conreg::init(Some(path.into())).await {
361 Ok(_) => {}
362 Err(e) => {
363 log::error!("conreg init failed: {}", e);
364 exit(1);
365 }
366 };
367}
368
369/// Initialize from custom configuration
370pub async fn init_with(config: ConRegConfig) {
371 match Conreg::init_with(&config).await {
372 Ok(_) => {}
373 Err(e) => {
374 log::error!("conreg init failed: {}", e);
375 exit(1);
376 }
377 };
378}
379
380/// Application Configuration
381pub struct AppConfig;
382impl AppConfig {
383 fn reload(configs: Configs) {
384 match CONFIGS.get() {
385 None => {
386 log::error!("config not init");
387 }
388 Some(config) => {
389 *config.write().unwrap() = configs;
390 }
391 }
392 }
393
394 /// Get configuration value
395 ///
396 /// `key` is the key of the configuration item, such as `app.name`.
397 ///
398 /// Note: The type of the obtained value needs to be consistent with the type of the value in the configuration.
399 /// If they are inconsistent, it may cause conversion failure. When conversion fails, `None` will be returned.
400 ///
401 /// This method retrieves from the flattened configuration. To retrieve the raw configuration, use `get_raw`.
402 pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
403 match CONFIGS.get() {
404 None => {
405 log::error!("config not init");
406 None
407 }
408 Some(config) => match config.read().expect("read lock error").get(key) {
409 None => None,
410 Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
411 Ok(value) => Some(value),
412 Err(e) => {
413 log::error!("parse config failed, {}", e);
414 None
415 }
416 },
417 },
418 }
419 }
420
421 /// Get raw configuration value
422 pub fn get_raw<V: DeserializeOwned>(key: &str) -> Option<V> {
423 match CONFIGS.get() {
424 None => {
425 log::error!("config not init");
426 None
427 }
428 Some(config) => match config.read().expect("read lock error").get_raw(key) {
429 None => None,
430 Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
431 Ok(value) => Some(value),
432 Err(e) => {
433 log::error!("parse config failed, {}", e);
434 None
435 }
436 },
437 },
438 }
439 }
440
441 /// Add configuration listener
442 ///
443 /// - `config_id`: Configuration ID
444 /// - `handler`: Configuration listener function, parameter is the changed, merged and flattened configuration content
445 pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, serde_yaml::Value>)) {
446 Configs::add_listener(config_id, handler);
447 }
448}
449
450/// Service Discovery
451pub struct AppDiscovery;
452impl AppDiscovery {
453 /// Get available service instances for the specified service
454 pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
455 match DISCOVERY.get() {
456 Some(discovery) => {
457 let instances = discovery.get_instances(service_id).await;
458 Ok(instances)
459 }
460 None => {
461 bail!("discovery not initialized")
462 }
463 }
464 }
465}
466
467#[cfg(test)]
468#[allow(unused)]
469mod tests {
470 use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
471 use crate::{AppConfig, AppDiscovery, init};
472 use reqwest::StatusCode;
473 use reqwest::multipart::{Form, Part};
474 use serde::{Deserialize, Serialize};
475 use serde_json::json;
476 use std::collections::HashMap;
477
478 #[tokio::test]
479 async fn test_config() {
480 //init_log();
481 init().await;
482 //init_from_file("bootstrap.yaml").await;
483 /*init_with(
484 ConRegConfigBuilder::default()
485 .config(
486 ConfigConfigBuilder::default()
487 .server_addr("127.0.0.1:8000")
488 .namespace("public")
489 .config_ids(vec!["test.yaml".into()])
490 .auth_token(Some("2cTtsBUpor".to_string()))
491 .build()
492 .unwrap(),
493 )
494 .build()
495 .unwrap(),
496 )
497 .await;*/
498 println!("{:?}", AppConfig::get::<String>("name"));
499 println!("{:?}", AppConfig::get::<u32>("age"));
500
501 AppConfig::add_listener("test.yaml", |config| {
502 println!("Listen config change1: {:?}", config);
503 });
504 AppConfig::add_listener("test2.yml", |config| {
505 println!("Listen config change2: {:?}", config);
506 });
507 let h = tokio::spawn(async move {
508 loop {
509 println!("{:?}", AppConfig::get::<String>("name"));
510 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
511 }
512 });
513 tokio::join!(h);
514 }
515
516 #[tokio::test]
517 async fn test_discovery() {
518 //init_log();
519 init().await;
520 // let config = ConRegConfigBuilder::default()
521 // .service_id("your_service_id")
522 // .client(
523 // ClientConfigBuilder::default()
524 // .address("127.0.0.1")
525 // .port(8080)
526 // .build()
527 // .unwrap(),
528 // )
529 // .discovery(
530 // DiscoveryConfigBuilder::default()
531 // .server_addr(vec!["127.0.0.1:8000", "127.0.0.1:8001"])
532 // .build()
533 // .unwrap(),
534 // )
535 // .build()
536 // .unwrap();
537 // // println!("config: {:?}", config);
538 // let service_id = config.service_id.clone();
539 // init_with(config).await;
540 let h = tokio::spawn(async move {
541 loop {
542 println!("{:?}", AppConfig::get::<String>("name"));
543 let instances =
544 AppDiscovery::get_instances(crate::utils::current_process_name().as_str())
545 .await
546 .unwrap();
547 println!("current: {:?}", instances);
548 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
549 }
550 });
551 tokio::join!(h);
552 }
553}