Skip to main content

conreg_client/
lib.rs

1//! # Conreg Client
2//!
3//! Conreg is a distributed service registry and configuration center designed with reference to Nacos. See details: [conreg](https://github.com/xgpxg/conreg)
4//!
5//! conreg-client is the client SDK for conreg, used to integrate into your services and communicate with conreg-server.
6//!
7//! # Features
8//!
9//! - Configuration Center: Load and manage configurations from conreg-server
10//! - Service Discovery: Register and discover service instances
11//! - Load Balancing: Multiple load balancing strategies (Random, Round-Robin, Weighted, etc.)
12//! - Declarative HTTP Client: Feign-like declarative microservice calling (requires `feign` feature)
13//!
14//! # Quick Start
15//!
16//! ## Basic Usage
17//!
18//! Add a `bootstrap.yaml` configuration file in your project's root directory:
19//!
20//! ```yaml
21//! conreg:
22//!   # Service ID is the unique identifier of the service. Service IDs in the same namespace cannot be duplicated.
23//!   service-id: test
24//!   # Client configuration, this information will be submitted to the registry as basic information of the service instance
25//!   client:
26//!     # Listening address
27//!     address: 127.0.0.1
28//!     # Port
29//!     port: 8000
30//!   # Configuration center configuration
31//!   config:
32//!   # Configuration center address
33//!     server-addr: 127.0.0.1:8000
34//!     # Configuration ID
35//!     # If there are duplicate configuration keys in multiple configurations, the latter configuration will overwrite the previous one
36//!     config-ids:
37//!       - test.yaml
38//!     auth-token: your_token
39//!   # Registry configuration
40//!   discovery:
41//!     # Registry address
42//!     server-addr:
43//!       - 127.0.0.1:8000
44//!       - 127.0.0.1:8001
45//!       - 127.0.0.1:8002
46//!     auth-token: your_token
47//! ```
48//!
49//! Then, initialize in the `main` function:
50//!
51//! ```rust
52//! #[tokio::main]
53//! async fn main() {
54//!     // Initialization
55//!     init().await;
56//!     // Get configuration item
57//!     println!("{:?}", AppConfig::get::<String>("name"));
58//!     // Get service instances
59//!     let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
60//!     println!("service instances: {:?}", instances);
61//! }
62//! ```
63//!
64//! ## Namespace
65//!
66//! Conreg uses namespaces to isolate configurations and services. The default namespace is `public`.
67//!
68//! ## Configuration Center
69//!
70//! Load and use configurations from the configuration center. Currently only `yaml` format configurations are supported.
71//!
72//! ### Initialize and Load Configuration
73//!
74//! ```rust
75//! #[tokio::main]
76//! async fn main() {
77//!     init_with(
78//!         ConRegConfigBuilder::default()
79//!             .config(
80//!                 ConfigConfigBuilder::default()
81//!                     .server_addr("127.0.0.1:8000")
82//!                     .namespace("public")
83//!                     .config_ids(vec!["test.yaml".into()])
84//!                     .build()
85//!                     .unwrap(),
86//!             )
87//!             .build()
88//!             .unwrap(),
89//!     )
90//!         .await;
91//!     println!("{:?}", AppConfig::get::<String>("name"));
92//!     println!("{:?}", AppConfig::get::<u32>("age"));
93//! }
94//! ```
95//!
96//! ### Initialize from Configuration File
97//!
98//! By default, conreg-client loads configurations from the bootstrap.yaml file in the project root directory to initialize configurations, just like SpringCloud.
99//! The following is an example of `bootstrap.yaml` configuration:
100//!
101//! ```yaml
102//! conreg:
103//!   config:
104//!     server-addr: 127.0.0.1:8000
105//!     config-ids:
106//!       - your_config.yaml
107//! ```
108//!
109//! Then call the `init` method to initialize and get the configuration content.
110//!
111//! ```rust
112//! #[tokio::main]
113//! async fn main() {
114//!     init().await;
115//!     // Or specify the configuration file path
116//!     // init_from_file("config.yaml").await;
117//!     println!("{:?}", AppConfig::get::<String>("name"));
118//!     println!("{:?}", AppConfig::get::<u32>("age"));
119//! }
120//! ```
121//!
122//! ## Registry Center
123//!
124//! Used for service registration and discovery.
125//!
126//! ### Initialize and Load Configuration
127//!
128//! ```rust
129//! #[tokio::main]
130//! async fn main() {
131//!     let config = ConRegConfigBuilder::default()
132//!         .service_id("your_service_id")
133//!         .client(
134//!             ClientConfigBuilder::default()
135//!                 .address("127.0.0.1")
136//!                 .port(8080)
137//!                 .build()
138//!                 .unwrap(),
139//!         )
140//!         .discovery(
141//!             DiscoveryConfigBuilder::default()
142//!                 .server_addr("127.0.0.1:8000")
143//!                 .build()
144//!                 .unwrap(),
145//!         )
146//!         .build()
147//!         .unwrap();
148//!     let service_id = config.service_id.clone();
149//!     init_with(config).await;
150//!     let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
151//!     println!("service instances: {:?}", instances);
152//! }
153//! ```
154//!
155//! ### Initialize from Configuration File
156//!
157//! By default, configurations are loaded from `bootstrap.yaml`.
158//! The following is an example configuration:
159//!
160//! ```yaml
161//! conreg:
162//!   service-id: your_service_id
163//!   client:
164//!     address: 127.0.0.1
165//!     port: 8000
166//!   discovery:
167//!     server-addr:
168//!       - 127.0.0.1:8000
169//!       - 127.0.0.1:8001
170//!       - 127.0.0.1:8002
171//! ```
172//!
173//! ```rust
174//! #[tokio::main]
175//! async fn main() {
176//!     init().await;
177//!     // Or specify the configuration file path
178//!     // init_from_file("config.yaml").await;
179//!     init_with(config).await;
180//!     let service_id = "your_service_id";
181//!     let instances = AppDiscovery::get_instances(service_id).await.unwrap();
182//!     println!("service instances: {:?}", instances);
183//! }
184//! ```
185//!
186//! # Load Balancing
187//!
188//! conreg-client provides a load balancing client based on `reqwest`, supporting custom protocol requests in the format `lb://service_id`.
189//! Reference: [lb](https://docs.rs/conreg-client/latest/conreg_client/lb/index.html)
190//!
191//! # Listen for Configuration Changes
192//!
193//! Add a handler function for the specified config_id, which will be called when the configuration changes.
194//!
195//! ```rust
196//! AppConfig::add_listener("test.yaml", |config| {
197//! println!("Config changed, new config: {:?}", config);
198//! });
199//! ```
200//!
201//! # Feign-like Component
202//! [conreg-feign-macro](https://docs.rs/conreg-feign-macro) provides a macro that implements functionality similar to Java's Feign, enabling remote procedure calls across microservices.
203//!
204//! Example:
205//! ```rust
206//! #[feign_client(service_id = "user-service", base_path = "/api")]
207//! trait UserService{
208//!   #[get("/api/users/{id}")]
209//!   async fn get_user(&self, id: i32) -> Result<String, FeignError>;
210//!
211//!   // ... other methods
212//! }
213//!
214//! // Then, you can use the generated client like this:
215//! let client = UserServiceImpl::default();
216//! let user = client.get_user(1).await?;
217//! ```
218
219use crate::conf::{ConRegConfig, ConRegConfigWrapper};
220use crate::config::Configs;
221use crate::discovery::{Discovery, DiscoveryClient};
222pub use crate::protocol::Instance;
223use anyhow::bail;
224use serde::de::DeserializeOwned;
225use std::collections::HashMap;
226use std::path::PathBuf;
227use std::process::exit;
228use std::sync::{Arc, OnceLock, RwLock};
229
230pub mod conf;
231mod config;
232mod discovery;
233pub mod lb;
234mod network;
235mod protocol;
236mod utils;
237
238#[cfg(feature = "feign")]
239pub use conreg_feign_macro::{delete, feign_client, get, patch, post, put};
240
241/// Feign client error types
242#[derive(Debug)]
243pub enum FeignError {
244    /// HTTP request error
245    RequestError(String),
246    /// Response deserialization error
247    DeserializationError(String),
248    /// Service instance not found
249    InstanceNotFound(String),
250    /// Load balance error
251    LoadBalanceError(String),
252}
253
254impl std::fmt::Display for FeignError {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        match self {
257            FeignError::RequestError(msg) => write!(f, "Request error: {}", msg),
258            FeignError::DeserializationError(msg) => {
259                write!(f, "Deserialization error: {}", msg)
260            }
261            FeignError::InstanceNotFound(msg) => write!(f, "Instance not found: {}", msg),
262            FeignError::LoadBalanceError(msg) => write!(f, "Load balance error: {}", msg),
263        }
264    }
265}
266
267impl std::error::Error for FeignError {}
268
269impl From<crate::lb::LoadBalanceError> for FeignError {
270    fn from(err: crate::lb::LoadBalanceError) -> Self {
271        FeignError::LoadBalanceError(err.to_string())
272    }
273}
274
275struct Conreg;
276
277/// Store configuration content
278static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
279/// Global instance for service discovery
280static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
281/// Request header for namespace authentication
282const NS_TOKEN_HEADER: &str = "X-NS-Token";
283
284impl Conreg {
285    /// Initialize configuration center and registry center
286    async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
287        #[cfg(feature = "tracing")]
288        utils::init_log();
289
290        let mut file = file.unwrap_or("bootstrap.yaml".into());
291        if !file.exists() {
292            file = "bootstrap.yml".into();
293        }
294        let s = match std::fs::read_to_string(&file) {
295            Ok(s) => s,
296            Err(e) => {
297                log::error!("no bootstrap.yaml found, {}", e);
298                exit(1);
299            }
300        };
301
302        log::info!("loaded bootstrap config from {}", file.display());
303
304        let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
305            Ok(config) => config,
306            Err(e) => {
307                log::error!("parse bootstrap.yaml failed, {}", e);
308                exit(1);
309            }
310        };
311
312        Self::init_with(&config.conreg).await?;
313
314        log::info!("conreg init completed");
315        Ok(())
316    }
317
318    async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
319        #[cfg(feature = "tracing")]
320        utils::init_log();
321
322        if config.config.is_some() {
323            let config_client = config::ConfigClient::new(config);
324            let configs = config_client.load().await?;
325            CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
326                anyhow::anyhow!(
327                    "config has already been initialized, please do not initialize repeatedly"
328                )
329            })?;
330        }
331
332        if config.discovery.is_some() {
333            let discovery_client = DiscoveryClient::new(config);
334            discovery_client.register().await?;
335            let discovery = Discovery::new(discovery_client).await;
336            DISCOVERY.set(discovery).map_err(|_| {
337                anyhow::anyhow!(
338                    "discovery has already been initialized, please do not initialize repeatedly"
339                )
340            })?;
341        }
342
343        Ok(())
344    }
345}
346
347/// Initialize configuration center and registry center
348pub async fn init() {
349    match Conreg::init(None).await {
350        Ok(_) => {}
351        Err(e) => {
352            log::error!("conreg init failed: {}", e);
353            exit(1);
354        }
355    };
356}
357
358/// Initialize configuration center and registry center from configuration file
359pub async fn init_from_file(path: impl Into<PathBuf>) {
360    match Conreg::init(Some(path.into())).await {
361        Ok(_) => {}
362        Err(e) => {
363            log::error!("conreg init failed: {}", e);
364            exit(1);
365        }
366    };
367}
368
369/// Initialize from custom configuration
370pub async fn init_with(config: ConRegConfig) {
371    match Conreg::init_with(&config).await {
372        Ok(_) => {}
373        Err(e) => {
374            log::error!("conreg init failed: {}", e);
375            exit(1);
376        }
377    };
378}
379
380/// Application Configuration
381pub struct AppConfig;
382impl AppConfig {
383    fn reload(configs: Configs) {
384        match CONFIGS.get() {
385            None => {
386                log::error!("config not init");
387            }
388            Some(config) => {
389                *config.write().unwrap() = configs;
390            }
391        }
392    }
393
394    /// Get configuration value
395    ///
396    /// `key` is the key of the configuration item, such as `app.name`.
397    ///
398    /// Note: The type of the obtained value needs to be consistent with the type of the value in the configuration.
399    /// If they are inconsistent, it may cause conversion failure. When conversion fails, `None` will be returned.
400    ///
401    /// This method retrieves from the flattened configuration. To retrieve the raw configuration, use `get_raw`.
402    pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
403        match CONFIGS.get() {
404            None => {
405                log::error!("config not init");
406                None
407            }
408            Some(config) => match config.read().expect("read lock error").get(key) {
409                None => None,
410                Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
411                    Ok(value) => Some(value),
412                    Err(e) => {
413                        log::error!("parse config failed, {}", e);
414                        None
415                    }
416                },
417            },
418        }
419    }
420
421    /// Get raw configuration value
422    pub fn get_raw<V: DeserializeOwned>(key: &str) -> Option<V> {
423        match CONFIGS.get() {
424            None => {
425                log::error!("config not init");
426                None
427            }
428            Some(config) => match config.read().expect("read lock error").get_raw(key) {
429                None => None,
430                Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
431                    Ok(value) => Some(value),
432                    Err(e) => {
433                        log::error!("parse config failed, {}", e);
434                        None
435                    }
436                },
437            },
438        }
439    }
440
441    /// Add configuration listener
442    ///
443    /// - `config_id`: Configuration ID
444    /// - `handler`: Configuration listener function, parameter is the changed, merged and flattened configuration content
445    pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, serde_yaml::Value>)) {
446        Configs::add_listener(config_id, handler);
447    }
448}
449
450/// Service Discovery
451pub struct AppDiscovery;
452impl AppDiscovery {
453    /// Get available service instances for the specified service
454    pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
455        match DISCOVERY.get() {
456            Some(discovery) => {
457                let instances = discovery.get_instances(service_id).await;
458                Ok(instances)
459            }
460            None => {
461                bail!("discovery not initialized")
462            }
463        }
464    }
465}
466
467#[cfg(test)]
468#[allow(unused)]
469mod tests {
470    use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
471    use crate::{AppConfig, AppDiscovery, init};
472    use reqwest::StatusCode;
473    use reqwest::multipart::{Form, Part};
474    use serde::{Deserialize, Serialize};
475    use serde_json::json;
476    use std::collections::HashMap;
477
478    #[tokio::test]
479    async fn test_config() {
480        //init_log();
481        init().await;
482        //init_from_file("bootstrap.yaml").await;
483        /*init_with(
484            ConRegConfigBuilder::default()
485                .config(
486                    ConfigConfigBuilder::default()
487                        .server_addr("127.0.0.1:8000")
488                        .namespace("public")
489                        .config_ids(vec!["test.yaml".into()])
490                        .auth_token(Some("2cTtsBUpor".to_string()))
491                        .build()
492                        .unwrap(),
493                )
494                .build()
495                .unwrap(),
496        )
497        .await;*/
498        println!("{:?}", AppConfig::get::<String>("name"));
499        println!("{:?}", AppConfig::get::<u32>("age"));
500
501        AppConfig::add_listener("test.yaml", |config| {
502            println!("Listen config change1: {:?}", config);
503        });
504        AppConfig::add_listener("test2.yml", |config| {
505            println!("Listen config change2: {:?}", config);
506        });
507        let h = tokio::spawn(async move {
508            loop {
509                println!("{:?}", AppConfig::get::<String>("name"));
510                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
511            }
512        });
513        tokio::join!(h);
514    }
515
516    #[tokio::test]
517    async fn test_discovery() {
518        //init_log();
519        init().await;
520        // let config = ConRegConfigBuilder::default()
521        //     .service_id("your_service_id")
522        //     .client(
523        //         ClientConfigBuilder::default()
524        //             .address("127.0.0.1")
525        //             .port(8080)
526        //             .build()
527        //             .unwrap(),
528        //     )
529        //     .discovery(
530        //         DiscoveryConfigBuilder::default()
531        //             .server_addr(vec!["127.0.0.1:8000", "127.0.0.1:8001"])
532        //             .build()
533        //             .unwrap(),
534        //     )
535        //     .build()
536        //     .unwrap();
537        // // println!("config: {:?}", config);
538        // let service_id = config.service_id.clone();
539        // init_with(config).await;
540        let h = tokio::spawn(async move {
541            loop {
542                println!("{:?}", AppConfig::get::<String>("name"));
543                let instances =
544                    AppDiscovery::get_instances(crate::utils::current_process_name().as_str())
545                        .await
546                        .unwrap();
547                println!("current: {:?}", instances);
548                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
549            }
550        });
551        tokio::join!(h);
552    }
553}