conreg_client/
lib.rs

1//! # Conreg Client
2//!
3//! Conreg is a distributed service registry and configuration center designed with reference to Nacos. See details: [conreg](https://github.com/xgpxg/conreg)
4//!
5//! conreg-client is the client SDK for conreg, used to integrate into your services and communicate with conreg-server.
6//!
7//! # Quick Start
8//!
9//! ## Basic Usage
10//!
11//! Add a `bootstrap.yaml` configuration file in your project's root directory:
12//!
13//! ```yaml
14//! conreg:
15//!   # Service ID is the unique identifier of the service. Service IDs in the same namespace cannot be duplicated.
16//!   service-id: test
17//!   # Client configuration, this information will be submitted to the registry as basic information of the service instance
18//!   client:
19//!     # Listening address
20//!     address: 127.0.0.1
21//!     # Port
22//!     port: 8000
23//!   # Configuration center configuration
24//!   config:
25//!   # Configuration center address
26//!     server-addr: 127.0.0.1:8000
27//!     # Configuration ID
28//!     # If there are duplicate configuration keys in multiple configurations, the latter configuration will overwrite the previous one
29//!     config-ids:
30//!       - test.yaml
31//!     auth-token: your_token
32//!   # Registry configuration
33//!   discovery:
34//!     # Registry address
35//!     server-addr:
36//!       - 127.0.0.1:8000
37//!       - 127.0.0.1:8001
38//!       - 127.0.0.1:8002
39//!     auth-token: your_token
40//! ```
41//!
42//! Then, initialize in the `main` function:
43//!
44//! ```rust
45//! #[tokio::main]
46//! async fn main() {
47//!     // Initialization
48//!     init().await;
49//!     // Get configuration item
50//!     println!("{:?}", AppConfig::get::<String>("name"));
51//!     // Get service instances
52//!     let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
53//!     println!("service instances: {:?}", instances);
54//! }
55//! ```
56//!
57//! ## Namespace
58//!
59//! Conreg uses namespaces to isolate configurations and services. The default namespace is `public`.
60//!
61//! ## Configuration Center
62//!
63//! Load and use configurations from the configuration center. Currently only `yaml` format configurations are supported.
64//!
65//! ### Initialize and Load Configuration
66//!
67//! ```rust
68//! #[tokio::main]
69//! async fn main() {
70//!     init_with(
71//!         ConRegConfigBuilder::default()
72//!             .config(
73//!                 ConfigConfigBuilder::default()
74//!                     .server_addr("127.0.0.1:8000")
75//!                     .namespace("public")
76//!                     .config_ids(vec!["test.yaml".into()])
77//!                     .build()
78//!                     .unwrap(),
79//!             )
80//!             .build()
81//!             .unwrap(),
82//!     )
83//!         .await;
84//!     println!("{:?}", AppConfig::get::<String>("name"));
85//!     println!("{:?}", AppConfig::get::<u32>("age"));
86//! }
87//! ```
88//!
89//! ### Initialize from Configuration File
90//!
91//! By default, conreg-client loads configurations from the bootstrap.yaml file in the project root directory to initialize configurations, just like SpringCloud.
92//! The following is an example of `bootstrap.yaml` configuration:
93//!
94//! ```yaml
95//! conreg:
96//!   config:
97//!     server-addr: 127.0.0.1:8000
98//!     config-ids:
99//!       - your_config.yaml
100//! ```
101//!
102//! Then call the `init` method to initialize and get the configuration content.
103//!
104//! ```rust
105//! #[tokio::main]
106//! async fn main() {
107//!     init().await;
108//!     // Or specify the configuration file path
109//!     // init_from_file("config.yaml").await;
110//!     println!("{:?}", AppConfig::get::<String>("name"));
111//!     println!("{:?}", AppConfig::get::<u32>("age"));
112//! }
113//! ```
114//!
115//! ## Registry Center
116//!
117//! Used for service registration and discovery.
118//!
119//! ### Initialize and Load Configuration
120//!
121//! ```rust
122//! #[tokio::main]
123//! async fn main() {
124//!     let config = ConRegConfigBuilder::default()
125//!         .service_id("your_service_id")
126//!         .client(
127//!             ClientConfigBuilder::default()
128//!                 .address("127.0.0.1")
129//!                 .port(8080)
130//!                 .build()
131//!                 .unwrap(),
132//!         )
133//!         .discovery(
134//!             DiscoveryConfigBuilder::default()
135//!                 .server_addr("127.0.0.1:8000")
136//!                 .build()
137//!                 .unwrap(),
138//!         )
139//!         .build()
140//!         .unwrap();
141//!     let service_id = config.service_id.clone();
142//!     init_with(config).await;
143//!     let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
144//!     println!("service instances: {:?}", instances);
145//! }
146//! ```
147//!
148//! ### Initialize from Configuration File
149//!
150//! By default, configurations are loaded from `bootstrap.yaml`.
151//! The following is an example configuration:
152//!
153//! ```yaml
154//! conreg:
155//!   service-id: your_service_id
156//!   client:
157//!     address: 127.0.0.1
158//!     port: 8000
159//!   discovery:
160//!     server-addr:
161//!       - 127.0.0.1:8000
162//!       - 127.0.0.1:8001
163//!       - 127.0.0.1:8002
164//! ```
165//!
166//! ```rust
167//! #[tokio::main]
168//! async fn main() {
169//!     init().await;
170//!     // Or specify the configuration file path
171//!     // init_from_file("config.yaml").await;
172//!     init_with(config).await;
173//!     let service_id = "your_service_id";
174//!     let instances = AppDiscovery::get_instances(service_id).await.unwrap();
175//!     println!("service instances: {:?}", instances);
176//! }
177//! ```
178//!
179//! # Load Balancing
180//!
181//! conreg-client provides a load balancing client based on `reqwest`, supporting custom protocol requests in the format `lb://service_id`.
182//! Reference: [lb](https://docs.rs/conreg-client/latest/conreg_client/lb/index.html)
183//!
184//! # Listen for Configuration Changes
185//!
186//! Add a handler function for the specified config_id, which will be called when the configuration changes.
187//!
188//! ```rust
189//! AppConfig::add_listener("test.yaml", |config| {
190//! println!("Config changed, new config: {:?}", config);
191//! });
192//! ```
193
194use crate::conf::{ConRegConfig, ConRegConfigWrapper};
195use crate::config::Configs;
196use crate::discovery::{Discovery, DiscoveryClient};
197pub use crate::protocol::Instance;
198use anyhow::bail;
199use serde::de::DeserializeOwned;
200use std::collections::HashMap;
201use std::path::PathBuf;
202use std::process::exit;
203use std::sync::{Arc, OnceLock, RwLock};
204
205pub mod conf;
206mod config;
207mod discovery;
208pub mod lb;
209mod network;
210mod protocol;
211mod utils;
212
213struct Conreg;
214
215/// Store configuration content
216static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
217/// Global instance for service discovery
218static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
219/// Request header for namespace authentication
220const NS_TOKEN_HEADER: &str = "X-NS-Token";
221
222impl Conreg {
223    /// Initialize configuration center and registry center
224    async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
225        let mut file = file.unwrap_or("bootstrap.yaml".into());
226        if !file.exists() {
227            file = "bootstrap.yml".into();
228        }
229        let s = match std::fs::read_to_string(&file) {
230            Ok(s) => s,
231            Err(e) => {
232                log::error!("no bootstrap.yaml found, {}", e);
233                exit(1);
234            }
235        };
236
237        log::info!("loaded bootstrap config from {}", file.display());
238
239        let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
240            Ok(config) => config,
241            Err(e) => {
242                log::error!("parse bootstrap.yaml failed, {}", e);
243                exit(1);
244            }
245        };
246
247        Self::init_with(&config.conreg).await?;
248
249        log::info!("conreg init completed");
250        Ok(())
251    }
252
253    async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
254        #[cfg(feature = "logger")]
255        utils::init_log();
256
257        if config.config.is_some() {
258            let config_client = config::ConfigClient::new(config);
259            let configs = config_client.load().await?;
260            CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
261                anyhow::anyhow!(
262                    "config has already been initialized, please do not initialize repeatedly"
263                )
264            })?;
265        }
266
267        if config.discovery.is_some() {
268            let discovery_client = DiscoveryClient::new(config);
269            discovery_client.register().await?;
270            let discovery = Discovery::new(discovery_client).await;
271            DISCOVERY.set(discovery).map_err(|_| {
272                anyhow::anyhow!(
273                    "discovery has already been initialized, please do not initialize repeatedly"
274                )
275            })?;
276        }
277
278        Ok(())
279    }
280}
281
282/// Initialize configuration center and registry center
283pub async fn init() {
284    match Conreg::init(None).await {
285        Ok(_) => {}
286        Err(e) => {
287            log::error!("conreg init failed: {}", e);
288            exit(1);
289        }
290    };
291}
292
293/// Initialize configuration center and registry center from configuration file
294pub async fn init_from_file(path: impl Into<PathBuf>) {
295    match Conreg::init(Some(path.into())).await {
296        Ok(_) => {}
297        Err(e) => {
298            log::error!("conreg init failed: {}", e);
299            exit(1);
300        }
301    };
302}
303
304/// Initialize from custom configuration
305pub async fn init_with(config: ConRegConfig) {
306    match Conreg::init_with(&config).await {
307        Ok(_) => {}
308        Err(e) => {
309            log::error!("conreg init failed: {}", e);
310            exit(1);
311        }
312    };
313}
314
315/// Application Configuration
316pub struct AppConfig;
317impl AppConfig {
318    fn reload(configs: Configs) {
319        match CONFIGS.get() {
320            None => {
321                log::error!("config not init");
322            }
323            Some(config) => {
324                *config.write().unwrap() = configs;
325            }
326        }
327    }
328
329    /// Get configuration value
330    ///
331    /// `key` is the key of the configuration item, such as `app.name`.
332    ///
333    /// Note: The type of the obtained value needs to be consistent with the type of the value in the configuration.
334    /// If they are inconsistent, it may cause conversion failure. When conversion fails, `None` will be returned.
335    pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
336        match CONFIGS.get() {
337            None => {
338                log::error!("config not init");
339                None
340            }
341            Some(config) => match config.read().expect("read lock error").get(key) {
342                None => None,
343                Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
344                    Ok(value) => Some(value),
345                    Err(e) => {
346                        log::error!("parse config failed, {}", e);
347                        None
348                    }
349                },
350            },
351        }
352    }
353
354    /// Add configuration listener
355    ///
356    /// - `config_id`: Configuration ID
357    /// - `handler`: Configuration listener function, parameter is the changed, merged and flattened configuration content
358    pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, serde_yaml::Value>)) {
359        Configs::add_listener(config_id, handler);
360    }
361}
362
363/// Service Discovery
364pub struct AppDiscovery;
365impl AppDiscovery {
366    /// Get available service instances for the specified service
367    pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
368        match DISCOVERY.get() {
369            Some(discovery) => {
370                let instances = discovery.get_instances(service_id).await;
371                Ok(instances)
372            }
373            None => {
374                bail!("discovery not initialized")
375            }
376        }
377    }
378}
379
380#[cfg(test)]
381#[allow(unused)]
382mod tests {
383    use super::*;
384    use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
385    use serde::Deserialize;
386    use std::collections::HashMap;
387    #[tokio::test]
388    async fn test_config() {
389        //init_log();
390        init().await;
391        //init_from_file("bootstrap.yaml").await;
392        /*init_with(
393            ConRegConfigBuilder::default()
394                .config(
395                    ConfigConfigBuilder::default()
396                        .server_addr("127.0.0.1:8000")
397                        .namespace("public")
398                        .config_ids(vec!["test.yaml".into()])
399                        .auth_token(Some("2cTtsBUpor".to_string()))
400                        .build()
401                        .unwrap(),
402                )
403                .build()
404                .unwrap(),
405        )
406        .await;*/
407        println!("{:?}", AppConfig::get::<String>("name"));
408        println!("{:?}", AppConfig::get::<u32>("age"));
409
410        AppConfig::add_listener("test.yaml", |config| {
411            println!("Listen config change1: {:?}", config);
412        });
413        AppConfig::add_listener("test2.yml", |config| {
414            println!("Listen config change2: {:?}", config);
415        });
416        let h = tokio::spawn(async move {
417            loop {
418                println!("{:?}", AppConfig::get::<String>("name"));
419                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
420            }
421        });
422        tokio::join!(h);
423    }
424
425    #[tokio::test]
426    async fn test_discovery() {
427        //init_log();
428        init().await;
429        // let config = ConRegConfigBuilder::default()
430        //     .service_id("your_service_id")
431        //     .client(
432        //         ClientConfigBuilder::default()
433        //             .address("127.0.0.1")
434        //             .port(8080)
435        //             .build()
436        //             .unwrap(),
437        //     )
438        //     .discovery(
439        //         DiscoveryConfigBuilder::default()
440        //             .server_addr(vec!["127.0.0.1:8000", "127.0.0.1:8001"])
441        //             .build()
442        //             .unwrap(),
443        //     )
444        //     .build()
445        //     .unwrap();
446        // // println!("config: {:?}", config);
447        // let service_id = config.service_id.clone();
448        // init_with(config).await;
449        let h = tokio::spawn(async move {
450            loop {
451                println!("{:?}", AppConfig::get::<String>("name"));
452                let instances = AppDiscovery::get_instances(utils::current_process_name().as_str())
453                    .await
454                    .unwrap();
455                println!("current: {:?}", instances);
456                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
457            }
458        });
459        tokio::join!(h);
460    }
461}