1#![warn(missing_docs)]
11
12pub mod discovery;
13pub mod registry;
14
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::{collections::HashMap, net::SocketAddr, time::Duration};
19
20pub use wae_types::{WaeError, WaeResult};
21
22pub type ServiceResult<T> = WaeResult<T>;
24
25pub type ServiceError = WaeError;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ServiceInstance {
31 pub id: String,
33 pub name: String,
35 pub addr: SocketAddr,
37 pub metadata: HashMap<String, String>,
39 pub tags: Vec<String>,
41 pub registered_at: DateTime<Utc>,
43 pub last_heartbeat: DateTime<Utc>,
45 pub weight: u32,
47 pub status: ServiceStatus,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
53pub enum ServiceStatus {
54 Healthy,
56 Unhealthy,
58 Maintenance,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ServiceRegistration {
65 pub name: String,
67 pub addr: SocketAddr,
69 pub metadata: HashMap<String, String>,
71 pub tags: Vec<String>,
73 pub weight: u32,
75 pub heartbeat_interval: Duration,
77 pub health_check: Option<HealthCheckConfig>,
79}
80
81impl Default for ServiceRegistration {
82 fn default() -> Self {
83 Self {
84 name: "unnamed-service".to_string(),
85 addr: "0.0.0.0:3000".parse().unwrap(),
86 metadata: HashMap::new(),
87 tags: Vec::new(),
88 weight: 1,
89 heartbeat_interval: Duration::from_secs(10),
90 health_check: None,
91 }
92 }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct HealthCheckConfig {
98 pub path: String,
100 pub interval: Duration,
102 pub timeout: Duration,
104 pub failure_threshold: u32,
106}
107
108impl Default for HealthCheckConfig {
109 fn default() -> Self {
110 Self {
111 path: "/health".to_string(),
112 interval: Duration::from_secs(30),
113 timeout: Duration::from_secs(5),
114 failure_threshold: 3,
115 }
116 }
117}
118
119#[async_trait]
121pub trait ServiceRegistry: Send + Sync {
122 async fn register(&self, instance: &ServiceInstance) -> ServiceResult<()>;
124
125 async fn deregister(&self, service_id: &str) -> ServiceResult<()>;
127
128 async fn heartbeat(&self, service_id: &str) -> ServiceResult<()>;
130
131 async fn update_status(&self, service_id: &str, status: ServiceStatus) -> ServiceResult<()>;
133}
134
135#[async_trait]
137pub trait ServiceDiscovery: Send + Sync {
138 async fn discover(&self, service_name: &str) -> ServiceResult<Vec<ServiceInstance>>;
140
141 async fn discover_one(&self, service_name: &str) -> ServiceResult<Option<ServiceInstance>>;
143
144 async fn subscribe(&self, service_name: &str) -> ServiceResult<Box<dyn ServiceSubscription>>;
146}
147
148#[async_trait]
152pub trait ServiceSubscription: Send + Sync {
153 async fn instances(&self) -> ServiceResult<Vec<ServiceInstance>>;
155
156 async fn wait_for_change(&mut self) -> ServiceResult<Vec<ServiceInstance>>;
158}
159
160#[derive(Debug, Clone, Copy, Default)]
162pub enum LoadBalanceStrategy {
163 #[default]
165 RoundRobin,
166 Random,
168 WeightedRoundRobin,
170 LeastConnections,
172}
173
174pub struct LoadBalancer {
176 strategy: LoadBalanceStrategy,
177 current_index: std::sync::atomic::AtomicUsize,
178}
179
180impl LoadBalancer {
181 pub fn new(strategy: LoadBalanceStrategy) -> Self {
183 Self { strategy, current_index: std::sync::atomic::AtomicUsize::new(0) }
184 }
185
186 pub fn select(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
188 if instances.is_empty() {
189 return None;
190 }
191
192 match self.strategy {
193 LoadBalanceStrategy::RoundRobin => self.round_robin(instances),
194 LoadBalanceStrategy::Random => self.random(instances),
195 LoadBalanceStrategy::WeightedRoundRobin => self.weighted_round_robin(instances),
196 LoadBalanceStrategy::LeastConnections => self.least_connections(instances),
197 }
198 }
199
200 fn round_robin(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
201 let index = self.current_index.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % instances.len();
202 instances.get(index).cloned()
203 }
204
205 fn random(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
206 use std::{
207 collections::hash_map::RandomState,
208 hash::{BuildHasher, Hasher},
209 };
210
211 let state = RandomState::new();
212 let mut hasher = state.build_hasher();
213 hasher.write_u64(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() as u64);
214 let index = hasher.finish() as usize % instances.len();
215 instances.get(index).cloned()
216 }
217
218 fn weighted_round_robin(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
219 let total_weight: u32 = instances.iter().map(|i| i.weight).sum();
220 if total_weight == 0 {
221 return self.round_robin(instances);
222 }
223
224 let mut random_value =
225 (self.current_index.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % total_weight as usize) as u32;
226
227 for instance in instances {
228 if random_value < instance.weight {
229 return Some(instance.clone());
230 }
231 random_value -= instance.weight;
232 }
233
234 instances.last().cloned()
235 }
236
237 fn least_connections(&self, instances: &[ServiceInstance]) -> Option<ServiceInstance> {
238 instances.iter().min_by_key(|i| i.weight).cloned()
239 }
240}
241
242impl Default for LoadBalancer {
243 fn default() -> Self {
244 Self::new(LoadBalanceStrategy::default())
245 }
246}