1use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use std::collections::HashMap;
8
9#[derive(Clone)]
11pub struct EdgeManager {
12 nodes: Arc<Mutex<HashMap<String, EdgeNode>>>,
13 local_node_id: String,
14}
15
16#[derive(Clone, Debug)]
17pub struct EdgeNode {
18 pub node_id: String,
19 pub region: String,
20 pub location: String,
21 pub last_seen: Instant,
22 pub latency_ms: u64,
23 pub is_healthy: bool,
24
25 pub max_threads: usize,
27 pub available_threads: usize,
28 pub queue_capacity: usize,
29 pub current_load: f64,
30
31 pub tasks_processed: u64,
33 pub total_uptime: Duration,
34 pub avg_response_time: Duration,
35}
36
37#[derive(Debug, Clone)]
38pub struct TaskDistribution {
39 pub local_tasks: usize,
40 pub remote_tasks: Vec<RemoteTask>,
41}
42
43#[derive(Debug, Clone)]
44pub struct RemoteTask {
45 pub node_id: String,
46 pub task_count: usize,
47 pub reason: String,
48}
49
50#[derive(Debug, Clone, PartialEq)]
51pub enum DistributionStrategy {
52 LocalOnly,
54 LatencyBased,
56 LoadBased,
58 RoundRobin,
60 Adaptive,
62}
63
64impl EdgeManager {
65 pub fn new(node_id: impl Into<String>, region: impl Into<String>) -> Self {
66 let local_id = node_id.into();
67 let mut nodes = HashMap::new();
68
69 nodes.insert(local_id.clone(), EdgeNode {
71 node_id: local_id.clone(),
72 region: region.into(),
73 location: "local".to_string(),
74 last_seen: Instant::now(),
75 latency_ms: 0,
76 is_healthy: true,
77 max_threads: 8,
78 available_threads: 8,
79 queue_capacity: 1024,
80 current_load: 0.0,
81 tasks_processed: 0,
82 total_uptime: Duration::ZERO,
83 avg_response_time: Duration::ZERO,
84 });
85
86 Self {
87 nodes: Arc::new(Mutex::new(nodes)),
88 local_node_id: local_id,
89 }
90 }
91
92 pub fn register_node(&self, node: EdgeNode) {
94 let mut nodes = self.nodes.lock().unwrap();
95 nodes.insert(node.node_id.clone(), node);
96 }
97
98 pub fn unregister_node(&self, node_id: &str) {
100 let mut nodes = self.nodes.lock().unwrap();
101 nodes.remove(node_id);
102 }
103
104 pub fn update_node_health(&self, node_id: &str, is_healthy: bool) {
106 let mut nodes = self.nodes.lock().unwrap();
107 if let Some(node) = nodes.get_mut(node_id) {
108 node.is_healthy = is_healthy;
109 node.last_seen = Instant::now();
110 }
111 }
112
113 pub fn update_node_load(&self, node_id: &str, load: f64) {
115 let mut nodes = self.nodes.lock().unwrap();
116 if let Some(node) = nodes.get_mut(node_id) {
117 node.current_load = load;
118 node.last_seen = Instant::now();
119 }
120 }
121
122 pub fn healthy_nodes(&self) -> Vec<EdgeNode> {
124 let nodes = self.nodes.lock().unwrap();
125 nodes.values()
126 .filter(|n| n.is_healthy && n.last_seen.elapsed() < Duration::from_secs(30))
127 .cloned()
128 .collect()
129 }
130
131 pub fn find_best_node(&self, strategy: DistributionStrategy) -> Option<String> {
133 let nodes = self.nodes.lock().unwrap();
134 let healthy: Vec<_> = nodes.values()
135 .filter(|n| n.is_healthy && n.last_seen.elapsed() < Duration::from_secs(30))
136 .collect();
137
138 if healthy.is_empty() {
139 return None;
140 }
141
142 match strategy {
143 DistributionStrategy::LocalOnly => {
144 Some(self.local_node_id.clone())
145 }
146 DistributionStrategy::LatencyBased => {
147 healthy.iter()
148 .min_by_key(|n| n.latency_ms)
149 .map(|n| n.node_id.clone())
150 }
151 DistributionStrategy::LoadBased => {
152 healthy.iter()
153 .min_by(|a, b| a.current_load.partial_cmp(&b.current_load).unwrap())
154 .map(|n| n.node_id.clone())
155 }
156 DistributionStrategy::RoundRobin => {
157 healthy.iter()
159 .min_by_key(|n| n.tasks_processed)
160 .map(|n| n.node_id.clone())
161 }
162 DistributionStrategy::Adaptive => {
163 healthy.iter()
165 .min_by(|a, b| {
166 let score_a = self.adaptive_score(a);
167 let score_b = self.adaptive_score(b);
168 score_a.partial_cmp(&score_b).unwrap()
169 })
170 .map(|n| n.node_id.clone())
171 }
172 }
173 }
174
175 pub fn distribute_tasks(
177 &self,
178 task_count: usize,
179 strategy: DistributionStrategy,
180 ) -> TaskDistribution {
181 if strategy == DistributionStrategy::LocalOnly {
182 return TaskDistribution {
183 local_tasks: task_count,
184 remote_tasks: vec![],
185 };
186 }
187
188 let nodes = self.nodes.lock().unwrap();
189 let healthy: Vec<_> = nodes.values()
190 .filter(|n| n.is_healthy && n.last_seen.elapsed() < Duration::from_secs(30))
191 .collect();
192
193 if healthy.len() <= 1 {
194 return TaskDistribution {
195 local_tasks: task_count,
196 remote_tasks: vec![],
197 };
198 }
199
200 let mut distribution = TaskDistribution {
201 local_tasks: 0,
202 remote_tasks: vec![],
203 };
204
205 match strategy {
206 DistributionStrategy::LoadBased => {
207 let total_capacity: f64 = healthy.iter()
209 .map(|n| 1.0 - n.current_load)
210 .sum();
211
212 for node in healthy {
213 let capacity_ratio = (1.0 - node.current_load) / total_capacity;
214 let node_tasks = (task_count as f64 * capacity_ratio).round() as usize;
215
216 if node.node_id == self.local_node_id {
217 distribution.local_tasks = node_tasks;
218 } else {
219 distribution.remote_tasks.push(RemoteTask {
220 node_id: node.node_id.clone(),
221 task_count: node_tasks,
222 reason: format!("Load-based: {:.1}% load", node.current_load * 100.0),
223 });
224 }
225 }
226 }
227 DistributionStrategy::RoundRobin => {
228 let per_node = task_count / healthy.len();
230 let remainder = task_count % healthy.len();
231
232 for (i, node) in healthy.iter().enumerate() {
233 let node_tasks = per_node + if i < remainder { 1 } else { 0 };
234
235 if node.node_id == self.local_node_id {
236 distribution.local_tasks = node_tasks;
237 } else {
238 distribution.remote_tasks.push(RemoteTask {
239 node_id: node.node_id.clone(),
240 task_count: node_tasks,
241 reason: "Round-robin".to_string(),
242 });
243 }
244 }
245 }
246 _ => {
247 distribution.local_tasks = task_count;
249 }
250 }
251
252 distribution
253 }
254
255 pub fn node_stats(&self, node_id: &str) -> Option<EdgeNode> {
257 let nodes = self.nodes.lock().unwrap();
258 nodes.get(node_id).cloned()
259 }
260
261 pub fn to_json(&self) -> String {
263 let nodes = self.nodes.lock().unwrap();
264 let node_list: Vec<String> = nodes.values()
265 .map(|n| format!(
266 r#" {{
267 "node_id": "{}",
268 "region": "{}",
269 "healthy": {},
270 "latency_ms": {},
271 "load": {:.2},
272 "available_threads": {}
273 }}"#,
274 n.node_id,
275 n.region,
276 n.is_healthy,
277 n.latency_ms,
278 n.current_load,
279 n.available_threads
280 ))
281 .collect();
282
283 format!(
284 r#"{{
285 "local_node": "{}",
286 "node_count": {},
287 "nodes": [
288{}
289 ]
290}}"#,
291 self.local_node_id,
292 nodes.len(),
293 node_list.join(",\n")
294 )
295 }
296
297 fn adaptive_score(&self, node: &EdgeNode) -> f64 {
298 let latency_factor = node.latency_ms as f64 / 100.0;
300 let load_factor = node.current_load * 10.0;
301 let response_factor = node.avg_response_time.as_millis() as f64 / 10.0;
302
303 latency_factor + load_factor + response_factor
304 }
305}
306
307impl std::fmt::Display for EdgeNode {
308 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309 write!(
310 f,
311 "EdgeNode[id={}, region={}, latency={}ms, load={:.1}%, healthy={}]",
312 self.node_id,
313 self.region,
314 self.latency_ms,
315 self.current_load * 100.0,
316 self.is_healthy
317 )
318 }
319}
320
321impl std::fmt::Display for TaskDistribution {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 write!(
324 f,
325 "Distribution[local={}, remote={}]",
326 self.local_tasks,
327 self.remote_tasks.len()
328 )
329 }
330}
331
332
333
334
335