Skip to main content

wae_service/
lib.rs

1//! WAE Service - 服务发现与注册模块
2//!
3//! 提供统一的服务发现与注册抽象,支持多种注册中心。
4//!
5//! 深度融合 tokio 运行时,支持:
6//! - 服务注册与注销
7//! - 服务发现与健康检查
8//! - 负载均衡策略
9//! - 服务元数据管理
10#![warn(missing_docs)]
11
12pub mod discovery;
13pub mod registry;
14
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::{collections::HashMap, net::SocketAddr, time::Duration};
19
20pub use wae_types::{WaeError, WaeResult};
21
22/// 服务发现结果类型
23pub type ServiceResult<T> = WaeResult<T>;
24
25/// 服务发现错误类型
26pub type ServiceError = WaeError;
27
28/// 服务实例信息
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ServiceInstance {
31    /// 服务唯一标识
32    pub id: String,
33    /// 服务名称
34    pub name: String,
35    /// 服务地址
36    pub addr: SocketAddr,
37    /// 服务元数据
38    pub metadata: HashMap<String, String>,
39    /// 服务标签
40    pub tags: Vec<String>,
41    /// 注册时间
42    pub registered_at: DateTime<Utc>,
43    /// 最后心跳时间
44    pub last_heartbeat: DateTime<Utc>,
45    /// 服务权重 (用于负载均衡)
46    pub weight: u32,
47    /// 服务状态
48    pub status: ServiceStatus,
49}
50
51/// 服务状态
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
53pub enum ServiceStatus {
54    /// 健康
55    Healthy,
56    /// 不健康
57    Unhealthy,
58    /// 维护中
59    Maintenance,
60}
61
62/// 服务注册配置
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ServiceRegistration {
65    /// 服务名称
66    pub name: String,
67    /// 服务地址
68    pub addr: SocketAddr,
69    /// 服务元数据
70    pub metadata: HashMap<String, String>,
71    /// 服务标签
72    pub tags: Vec<String>,
73    /// 服务权重
74    pub weight: u32,
75    /// 心跳间隔
76    pub heartbeat_interval: Duration,
77    /// 健康检查配置
78    pub health_check: Option<HealthCheckConfig>,
79}
80
81impl Default for ServiceRegistration {
82    fn default() -> Self {
83        Self {
84            name: "unnamed-service".to_string(),
85            addr: "0.0.0.0:3000".parse().unwrap(),
86            metadata: HashMap::new(),
87            tags: Vec::new(),
88            weight: 1,
89            heartbeat_interval: Duration::from_secs(10),
90            health_check: None,
91        }
92    }
93}
94
95/// 健康检查配置
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct HealthCheckConfig {
98    /// 健康检查路径
99    pub path: String,
100    /// 健康检查间隔
101    pub interval: Duration,
102    /// 健康检查超时
103    pub timeout: Duration,
104    /// 失败阈值
105    pub failure_threshold: u32,
106}
107
108impl Default for HealthCheckConfig {
109    fn default() -> Self {
110        Self {
111            path: "/health".to_string(),
112            interval: Duration::from_secs(30),
113            timeout: Duration::from_secs(5),
114            failure_threshold: 3,
115        }
116    }
117}
118
119/// 服务注册中心 Trait
120#[async_trait]
121pub trait ServiceRegistry: Send + Sync {
122    /// 注册服务
123    async fn register(&self, instance: &ServiceInstance) -> ServiceResult<()>;
124
125    /// 注销服务
126    async fn deregister(&self, service_id: &str) -> ServiceResult<()>;
127
128    /// 发送心跳
129    async fn heartbeat(&self, service_id: &str) -> ServiceResult<()>;
130
131    /// 更新服务状态
132    async fn update_status(&self, service_id: &str, status: ServiceStatus) -> ServiceResult<()>;
133}
134
135/// 服务发现 Trait
136#[async_trait]
137pub trait ServiceDiscovery: Send + Sync {
138    /// 发现服务实例
139    async fn discover(&self, service_name: &str) -> ServiceResult<Vec<ServiceInstance>>;
140
141    /// 发现单个服务实例 (根据负载均衡策略)
142    async fn discover_one(&self, service_name: &str) -> ServiceResult<Option<ServiceInstance>>;
143
144    /// 订阅服务变更
145    async fn subscribe(&self, service_name: &str) -> ServiceResult<Box<dyn ServiceSubscription>>;
146}
147
148/// 服务订阅 Trait
149///
150/// 使用 `async_trait` 宏使其兼容 dyn。
151#[async_trait]
152pub trait ServiceSubscription: Send + Sync {
153    /// 获取服务实例列表
154    async fn instances(&self) -> ServiceResult<Vec<ServiceInstance>>;
155
156    /// 等待服务变更
157    async fn wait_for_change(&mut self) -> ServiceResult<Vec<ServiceInstance>>;
158}
159
160/// 负载均衡策略
161#[derive(Debug, Clone, Copy, Default)]
162pub enum LoadBalanceStrategy {
163    /// 轮询
164    #[default]
165    RoundRobin,
166    /// 随机
167    Random,
168    /// 加权轮询
169    WeightedRoundRobin,
170    /// 最少连接
171    LeastConnections,
172}
173
174/// 负载均衡器
175pub struct LoadBalancer {
176    strategy: LoadBalanceStrategy,
177    current_index: std::sync::atomic::AtomicUsize,
178}
179
180impl LoadBalancer {
181    /// 创建负载均衡器
182    pub fn new(strategy: LoadBalanceStrategy) -> Self {
183        Self { strategy, current_index: std::sync::atomic::AtomicUsize::new(0) }
184    }
185
186    /// 选择服务实例
187    pub fn select(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
188        if instances.is_empty() {
189            return None;
190        }
191
192        match self.strategy {
193            LoadBalanceStrategy::RoundRobin => self.round_robin(instances),
194            LoadBalanceStrategy::Random => self.random(instances),
195            LoadBalanceStrategy::WeightedRoundRobin => self.weighted_round_robin(instances),
196            LoadBalanceStrategy::LeastConnections => self.least_connections(instances),
197        }
198    }
199
200    fn round_robin(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
201        let index = self.current_index.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % instances.len();
202        instances.get(index).cloned()
203    }
204
205    fn random(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
206        use std::{
207            collections::hash_map::RandomState,
208            hash::{BuildHasher, Hasher},
209        };
210
211        let state = RandomState::new();
212        let mut hasher = state.build_hasher();
213        hasher.write_u64(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() as u64);
214        let index = hasher.finish() as usize % instances.len();
215        instances.get(index).cloned()
216    }
217
218    fn weighted_round_robin(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
219        let total_weight: u32 = instances.iter().map(|i| i.weight).sum();
220        if total_weight == 0 {
221            return self.round_robin(instances);
222        }
223
224        let mut random_value =
225            (self.current_index.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % total_weight as usize) as u32;
226
227        for instance in instances {
228            if random_value < instance.weight {
229                return Some(instance.clone());
230            }
231            random_value -= instance.weight;
232        }
233
234        instances.last().cloned()
235    }
236
237    fn least_connections(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
238        instances.iter().min_by_key(|i| i.weight).cloned()
239    }
240}
241
242impl Default for LoadBalancer {
243    fn default() -> Self {
244        Self::new(LoadBalanceStrategy::default())
245    }
246}