conreg_client/lib.rs
1//! # Conreg Client
2//!
3//! Conreg is a distributed service registry and configuration center designed with reference to Nacos. See details: [conreg](https://github.com/xgpxg/conreg)
4//!
5//! conreg-client is the client SDK for conreg, used to integrate into your services and communicate with conreg-server.
6//!
7//! # Quick Start
8//!
9//! ## Basic Usage
10//!
11//! Add a `bootstrap.yaml` configuration file in your project's root directory:
12//!
13//! ```yaml
14//! conreg:
15//! # Service ID is the unique identifier of the service. Service IDs in the same namespace cannot be duplicated.
16//! service-id: test
17//! # Client configuration, this information will be submitted to the registry as basic information of the service instance
18//! client:
19//! # Listening address
20//! address: 127.0.0.1
21//! # Port
22//! port: 8000
23//! # Configuration center configuration
24//! config:
25//! # Configuration center address
26//! server-addr: 127.0.0.1:8000
27//! # Configuration ID
28//! # If there are duplicate configuration keys in multiple configurations, the latter configuration will overwrite the previous one
29//! config-ids:
30//! - test.yaml
31//! auth-token: your_token
32//! # Registry configuration
33//! discovery:
34//! # Registry address
35//! server-addr:
36//! - 127.0.0.1:8000
37//! - 127.0.0.1:8001
38//! - 127.0.0.1:8002
39//! auth-token: your_token
40//! ```
41//!
42//! Then, initialize in the `main` function:
43//!
44//! ```rust
45//! #[tokio::main]
46//! async fn main() {
47//! // Initialization
48//! init().await;
49//! // Get configuration item
50//! println!("{:?}", AppConfig::get::<String>("name"));
51//! // Get service instances
52//! let instances = AppDiscovery::get_instances("your_service_id").await.unwrap();
53//! println!("service instances: {:?}", instances);
54//! }
55//! ```
56//!
57//! ## Namespace
58//!
59//! Conreg uses namespaces to isolate configurations and services. The default namespace is `public`.
60//!
61//! ## Configuration Center
62//!
63//! Load and use configurations from the configuration center. Currently only `yaml` format configurations are supported.
64//!
65//! ### Initialize and Load Configuration
66//!
67//! ```rust
68//! #[tokio::main]
69//! async fn main() {
70//! init_with(
71//! ConRegConfigBuilder::default()
72//! .config(
73//! ConfigConfigBuilder::default()
74//! .server_addr("127.0.0.1:8000")
75//! .namespace("public")
76//! .config_ids(vec!["test.yaml".into()])
77//! .build()
78//! .unwrap(),
79//! )
80//! .build()
81//! .unwrap(),
82//! )
83//! .await;
84//! println!("{:?}", AppConfig::get::<String>("name"));
85//! println!("{:?}", AppConfig::get::<u32>("age"));
86//! }
87//! ```
88//!
89//! ### Initialize from Configuration File
90//!
91//! By default, conreg-client loads configurations from the bootstrap.yaml file in the project root directory to initialize configurations, just like SpringCloud.
92//! The following is an example of `bootstrap.yaml` configuration:
93//!
94//! ```yaml
95//! conreg:
96//! config:
97//! server-addr: 127.0.0.1:8000
98//! config-ids:
99//! - your_config.yaml
100//! ```
101//!
102//! Then call the `init` method to initialize and get the configuration content.
103//!
104//! ```rust
105//! #[tokio::main]
106//! async fn main() {
107//! init().await;
108//! // Or specify the configuration file path
109//! // init_from_file("config.yaml").await;
110//! println!("{:?}", AppConfig::get::<String>("name"));
111//! println!("{:?}", AppConfig::get::<u32>("age"));
112//! }
113//! ```
114//!
115//! ## Registry Center
116//!
117//! Used for service registration and discovery.
118//!
119//! ### Initialize and Load Configuration
120//!
121//! ```rust
122//! #[tokio::main]
123//! async fn main() {
124//! let config = ConRegConfigBuilder::default()
125//! .service_id("your_service_id")
126//! .client(
127//! ClientConfigBuilder::default()
128//! .address("127.0.0.1")
129//! .port(8080)
130//! .build()
131//! .unwrap(),
132//! )
133//! .discovery(
134//! DiscoveryConfigBuilder::default()
135//! .server_addr("127.0.0.1:8000")
136//! .build()
137//! .unwrap(),
138//! )
139//! .build()
140//! .unwrap();
141//! let service_id = config.service_id.clone();
142//! init_with(config).await;
143//! let instances = AppDiscovery::get_instances(&service_id).await.unwrap();
144//! println!("service instances: {:?}", instances);
145//! }
146//! ```
147//!
148//! ### Initialize from Configuration File
149//!
150//! By default, configurations are loaded from `bootstrap.yaml`.
151//! The following is an example configuration:
152//!
153//! ```yaml
154//! conreg:
155//! service-id: your_service_id
156//! client:
157//! address: 127.0.0.1
158//! port: 8000
159//! discovery:
160//! server-addr:
161//! - 127.0.0.1:8000
162//! - 127.0.0.1:8001
163//! - 127.0.0.1:8002
164//! ```
165//!
166//! ```rust
167//! #[tokio::main]
168//! async fn main() {
169//! init().await;
170//! // Or specify the configuration file path
171//! // init_from_file("config.yaml").await;
172//! init_with(config).await;
173//! let service_id = "your_service_id";
174//! let instances = AppDiscovery::get_instances(service_id).await.unwrap();
175//! println!("service instances: {:?}", instances);
176//! }
177//! ```
178//!
179//! # Load Balancing
180//!
181//! conreg-client provides a load balancing client based on `reqwest`, supporting custom protocol requests in the format `lb://service_id`.
182//! Reference: [lb](https://docs.rs/conreg-client/latest/conreg_client/lb/index.html)
183//!
184//! # Listen for Configuration Changes
185//!
186//! Add a handler function for the specified config_id, which will be called when the configuration changes.
187//!
188//! ```rust
189//! AppConfig::add_listener("test.yaml", |config| {
190//! println!("Config changed, new config: {:?}", config);
191//! });
192//! ```
193
194use crate::conf::{ConRegConfig, ConRegConfigWrapper};
195use crate::config::Configs;
196use crate::discovery::{Discovery, DiscoveryClient};
197pub use crate::protocol::Instance;
198use anyhow::bail;
199use serde::de::DeserializeOwned;
200use std::collections::HashMap;
201use std::path::PathBuf;
202use std::process::exit;
203use std::sync::{Arc, OnceLock, RwLock};
204
205pub mod conf;
206mod config;
207mod discovery;
208pub mod lb;
209mod network;
210mod protocol;
211mod utils;
212
213struct Conreg;
214
215/// Store configuration content
216static CONFIGS: OnceLock<Arc<RwLock<Configs>>> = OnceLock::new();
217/// Global instance for service discovery
218static DISCOVERY: OnceLock<Discovery> = OnceLock::new();
219/// Request header for namespace authentication
220const NS_TOKEN_HEADER: &str = "X-NS-Token";
221
222impl Conreg {
223 /// Initialize configuration center and registry center
224 async fn init(file: Option<PathBuf>) -> anyhow::Result<()> {
225 let mut file = file.unwrap_or("bootstrap.yaml".into());
226 if !file.exists() {
227 file = "bootstrap.yml".into();
228 }
229 let s = match std::fs::read_to_string(&file) {
230 Ok(s) => s,
231 Err(e) => {
232 log::error!("no bootstrap.yaml found, {}", e);
233 exit(1);
234 }
235 };
236
237 log::info!("loaded bootstrap config from {}", file.display());
238
239 let config = match serde_yaml::from_str::<ConRegConfigWrapper>(&s) {
240 Ok(config) => config,
241 Err(e) => {
242 log::error!("parse bootstrap.yaml failed, {}", e);
243 exit(1);
244 }
245 };
246
247 Self::init_with(&config.conreg).await?;
248
249 log::info!("conreg init completed");
250 Ok(())
251 }
252
253 async fn init_with(config: &ConRegConfig) -> anyhow::Result<()> {
254 #[cfg(feature = "logger")]
255 utils::init_log();
256
257 if config.config.is_some() {
258 let config_client = config::ConfigClient::new(config);
259 let configs = config_client.load().await?;
260 CONFIGS.set(Arc::new(RwLock::new(configs))).map_err(|_| {
261 anyhow::anyhow!(
262 "config has already been initialized, please do not initialize repeatedly"
263 )
264 })?;
265 }
266
267 if config.discovery.is_some() {
268 let discovery_client = DiscoveryClient::new(config);
269 discovery_client.register().await?;
270 let discovery = Discovery::new(discovery_client).await;
271 DISCOVERY.set(discovery).map_err(|_| {
272 anyhow::anyhow!(
273 "discovery has already been initialized, please do not initialize repeatedly"
274 )
275 })?;
276 }
277
278 Ok(())
279 }
280}
281
282/// Initialize configuration center and registry center
283pub async fn init() {
284 match Conreg::init(None).await {
285 Ok(_) => {}
286 Err(e) => {
287 log::error!("conreg init failed: {}", e);
288 exit(1);
289 }
290 };
291}
292
293/// Initialize configuration center and registry center from configuration file
294pub async fn init_from_file(path: impl Into<PathBuf>) {
295 match Conreg::init(Some(path.into())).await {
296 Ok(_) => {}
297 Err(e) => {
298 log::error!("conreg init failed: {}", e);
299 exit(1);
300 }
301 };
302}
303
304/// Initialize from custom configuration
305pub async fn init_with(config: ConRegConfig) {
306 match Conreg::init_with(&config).await {
307 Ok(_) => {}
308 Err(e) => {
309 log::error!("conreg init failed: {}", e);
310 exit(1);
311 }
312 };
313}
314
315/// Application Configuration
316pub struct AppConfig;
317impl AppConfig {
318 fn reload(configs: Configs) {
319 match CONFIGS.get() {
320 None => {
321 log::error!("config not init");
322 }
323 Some(config) => {
324 *config.write().unwrap() = configs;
325 }
326 }
327 }
328
329 /// Get configuration value
330 ///
331 /// `key` is the key of the configuration item, such as `app.name`.
332 ///
333 /// Note: The type of the obtained value needs to be consistent with the type of the value in the configuration.
334 /// If they are inconsistent, it may cause conversion failure. When conversion fails, `None` will be returned.
335 pub fn get<V: DeserializeOwned>(key: &str) -> Option<V> {
336 match CONFIGS.get() {
337 None => {
338 log::error!("config not init");
339 None
340 }
341 Some(config) => match config.read().expect("read lock error").get(key) {
342 None => None,
343 Some(value) => match serde_yaml::from_value::<V>(value.clone()) {
344 Ok(value) => Some(value),
345 Err(e) => {
346 log::error!("parse config failed, {}", e);
347 None
348 }
349 },
350 },
351 }
352 }
353
354 /// Add configuration listener
355 ///
356 /// - `config_id`: Configuration ID
357 /// - `handler`: Configuration listener function, parameter is the changed, merged and flattened configuration content
358 pub fn add_listener(config_id: &str, handler: fn(&HashMap<String, serde_yaml::Value>)) {
359 Configs::add_listener(config_id, handler);
360 }
361}
362
363/// Service Discovery
364pub struct AppDiscovery;
365impl AppDiscovery {
366 /// Get available service instances for the specified service
367 pub async fn get_instances(service_id: &str) -> anyhow::Result<Vec<Instance>> {
368 match DISCOVERY.get() {
369 Some(discovery) => {
370 let instances = discovery.get_instances(service_id).await;
371 Ok(instances)
372 }
373 None => {
374 bail!("discovery not initialized")
375 }
376 }
377 }
378}
379
380#[cfg(test)]
381#[allow(unused)]
382mod tests {
383 use super::*;
384 use crate::conf::{ClientConfigBuilder, ConRegConfigBuilder, DiscoveryConfigBuilder};
385 use serde::Deserialize;
386 use std::collections::HashMap;
387 #[tokio::test]
388 async fn test_config() {
389 //init_log();
390 init().await;
391 //init_from_file("bootstrap.yaml").await;
392 /*init_with(
393 ConRegConfigBuilder::default()
394 .config(
395 ConfigConfigBuilder::default()
396 .server_addr("127.0.0.1:8000")
397 .namespace("public")
398 .config_ids(vec!["test.yaml".into()])
399 .auth_token(Some("2cTtsBUpor".to_string()))
400 .build()
401 .unwrap(),
402 )
403 .build()
404 .unwrap(),
405 )
406 .await;*/
407 println!("{:?}", AppConfig::get::<String>("name"));
408 println!("{:?}", AppConfig::get::<u32>("age"));
409
410 AppConfig::add_listener("test.yaml", |config| {
411 println!("Listen config change1: {:?}", config);
412 });
413 AppConfig::add_listener("test2.yml", |config| {
414 println!("Listen config change2: {:?}", config);
415 });
416 let h = tokio::spawn(async move {
417 loop {
418 println!("{:?}", AppConfig::get::<String>("name"));
419 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
420 }
421 });
422 tokio::join!(h);
423 }
424
425 #[tokio::test]
426 async fn test_discovery() {
427 //init_log();
428 init().await;
429 // let config = ConRegConfigBuilder::default()
430 // .service_id("your_service_id")
431 // .client(
432 // ClientConfigBuilder::default()
433 // .address("127.0.0.1")
434 // .port(8080)
435 // .build()
436 // .unwrap(),
437 // )
438 // .discovery(
439 // DiscoveryConfigBuilder::default()
440 // .server_addr(vec!["127.0.0.1:8000", "127.0.0.1:8001"])
441 // .build()
442 // .unwrap(),
443 // )
444 // .build()
445 // .unwrap();
446 // // println!("config: {:?}", config);
447 // let service_id = config.service_id.clone();
448 // init_with(config).await;
449 let h = tokio::spawn(async move {
450 loop {
451 println!("{:?}", AppConfig::get::<String>("name"));
452 let instances = AppDiscovery::get_instances(utils::current_process_name().as_str())
453 .await
454 .unwrap();
455 println!("current: {:?}", instances);
456 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
457 }
458 });
459 tokio::join!(h);
460 }
461}