avx_async/
edge.rs

1//! Edge Computing - Distributed runtime capabilities
2//!
3//! Provides edge computing features for distributed async runtime execution
4
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use std::collections::HashMap;
8
9/// Edge Computing Manager
10#[derive(Clone)]
11pub struct EdgeManager {
12    nodes: Arc<Mutex<HashMap<String, EdgeNode>>>,
13    local_node_id: String,
14}
15
16#[derive(Clone, Debug)]
17pub struct EdgeNode {
18    pub node_id: String,
19    pub region: String,
20    pub location: String,
21    pub last_seen: Instant,
22    pub latency_ms: u64,
23    pub is_healthy: bool,
24
25    // Capacity
26    pub max_threads: usize,
27    pub available_threads: usize,
28    pub queue_capacity: usize,
29    pub current_load: f64,
30
31    // Statistics
32    pub tasks_processed: u64,
33    pub total_uptime: Duration,
34    pub avg_response_time: Duration,
35}
36
37#[derive(Debug, Clone)]
38pub struct TaskDistribution {
39    pub local_tasks: usize,
40    pub remote_tasks: Vec<RemoteTask>,
41}
42
43#[derive(Debug, Clone)]
44pub struct RemoteTask {
45    pub node_id: String,
46    pub task_count: usize,
47    pub reason: String,
48}
49
50#[derive(Debug, Clone, PartialEq)]
51pub enum DistributionStrategy {
52    /// Process tasks locally only
53    LocalOnly,
54    /// Distribute based on latency
55    LatencyBased,
56    /// Distribute based on node load
57    LoadBased,
58    /// Balance across all nodes
59    RoundRobin,
60    /// Smart distribution based on multiple factors
61    Adaptive,
62}
63
64impl EdgeManager {
65    pub fn new(node_id: impl Into<String>, region: impl Into<String>) -> Self {
66        let local_id = node_id.into();
67        let mut nodes = HashMap::new();
68
69        // Register local node
70        nodes.insert(local_id.clone(), EdgeNode {
71            node_id: local_id.clone(),
72            region: region.into(),
73            location: "local".to_string(),
74            last_seen: Instant::now(),
75            latency_ms: 0,
76            is_healthy: true,
77            max_threads: 8,
78            available_threads: 8,
79            queue_capacity: 1024,
80            current_load: 0.0,
81            tasks_processed: 0,
82            total_uptime: Duration::ZERO,
83            avg_response_time: Duration::ZERO,
84        });
85
86        Self {
87            nodes: Arc::new(Mutex::new(nodes)),
88            local_node_id: local_id,
89        }
90    }
91
92    /// Register a remote edge node
93    pub fn register_node(&self, node: EdgeNode) {
94        let mut nodes = self.nodes.lock().unwrap();
95        nodes.insert(node.node_id.clone(), node);
96    }
97
98    /// Unregister an edge node
99    pub fn unregister_node(&self, node_id: &str) {
100        let mut nodes = self.nodes.lock().unwrap();
101        nodes.remove(node_id);
102    }
103
104    /// Update node health status
105    pub fn update_node_health(&self, node_id: &str, is_healthy: bool) {
106        let mut nodes = self.nodes.lock().unwrap();
107        if let Some(node) = nodes.get_mut(node_id) {
108            node.is_healthy = is_healthy;
109            node.last_seen = Instant::now();
110        }
111    }
112
113    /// Update node load
114    pub fn update_node_load(&self, node_id: &str, load: f64) {
115        let mut nodes = self.nodes.lock().unwrap();
116        if let Some(node) = nodes.get_mut(node_id) {
117            node.current_load = load;
118            node.last_seen = Instant::now();
119        }
120    }
121
122    /// Get all healthy nodes
123    pub fn healthy_nodes(&self) -> Vec<EdgeNode> {
124        let nodes = self.nodes.lock().unwrap();
125        nodes.values()
126            .filter(|n| n.is_healthy && n.last_seen.elapsed() < Duration::from_secs(30))
127            .cloned()
128            .collect()
129    }
130
131    /// Find best node for task execution
132    pub fn find_best_node(&self, strategy: DistributionStrategy) -> Option<String> {
133        let nodes = self.nodes.lock().unwrap();
134        let healthy: Vec<_> = nodes.values()
135            .filter(|n| n.is_healthy && n.last_seen.elapsed() < Duration::from_secs(30))
136            .collect();
137
138        if healthy.is_empty() {
139            return None;
140        }
141
142        match strategy {
143            DistributionStrategy::LocalOnly => {
144                Some(self.local_node_id.clone())
145            }
146            DistributionStrategy::LatencyBased => {
147                healthy.iter()
148                    .min_by_key(|n| n.latency_ms)
149                    .map(|n| n.node_id.clone())
150            }
151            DistributionStrategy::LoadBased => {
152                healthy.iter()
153                    .min_by(|a, b| a.current_load.partial_cmp(&b.current_load).unwrap())
154                    .map(|n| n.node_id.clone())
155            }
156            DistributionStrategy::RoundRobin => {
157                // Simple round-robin: pick node with least processed tasks
158                healthy.iter()
159                    .min_by_key(|n| n.tasks_processed)
160                    .map(|n| n.node_id.clone())
161            }
162            DistributionStrategy::Adaptive => {
163                // Score based on latency, load, and response time
164                healthy.iter()
165                    .min_by(|a, b| {
166                        let score_a = self.adaptive_score(a);
167                        let score_b = self.adaptive_score(b);
168                        score_a.partial_cmp(&score_b).unwrap()
169                    })
170                    .map(|n| n.node_id.clone())
171            }
172        }
173    }
174
175    /// Distribute tasks across nodes
176    pub fn distribute_tasks(
177        &self,
178        task_count: usize,
179        strategy: DistributionStrategy,
180    ) -> TaskDistribution {
181        if strategy == DistributionStrategy::LocalOnly {
182            return TaskDistribution {
183                local_tasks: task_count,
184                remote_tasks: vec![],
185            };
186        }
187
188        let nodes = self.nodes.lock().unwrap();
189        let healthy: Vec<_> = nodes.values()
190            .filter(|n| n.is_healthy && n.last_seen.elapsed() < Duration::from_secs(30))
191            .collect();
192
193        if healthy.len() <= 1 {
194            return TaskDistribution {
195                local_tasks: task_count,
196                remote_tasks: vec![],
197            };
198        }
199
200        let mut distribution = TaskDistribution {
201            local_tasks: 0,
202            remote_tasks: vec![],
203        };
204
205        match strategy {
206            DistributionStrategy::LoadBased => {
207                // Distribute inversely proportional to load
208                let total_capacity: f64 = healthy.iter()
209                    .map(|n| 1.0 - n.current_load)
210                    .sum();
211
212                for node in healthy {
213                    let capacity_ratio = (1.0 - node.current_load) / total_capacity;
214                    let node_tasks = (task_count as f64 * capacity_ratio).round() as usize;
215
216                    if node.node_id == self.local_node_id {
217                        distribution.local_tasks = node_tasks;
218                    } else {
219                        distribution.remote_tasks.push(RemoteTask {
220                            node_id: node.node_id.clone(),
221                            task_count: node_tasks,
222                            reason: format!("Load-based: {:.1}% load", node.current_load * 100.0),
223                        });
224                    }
225                }
226            }
227            DistributionStrategy::RoundRobin => {
228                // Evenly distribute
229                let per_node = task_count / healthy.len();
230                let remainder = task_count % healthy.len();
231
232                for (i, node) in healthy.iter().enumerate() {
233                    let node_tasks = per_node + if i < remainder { 1 } else { 0 };
234
235                    if node.node_id == self.local_node_id {
236                        distribution.local_tasks = node_tasks;
237                    } else {
238                        distribution.remote_tasks.push(RemoteTask {
239                            node_id: node.node_id.clone(),
240                            task_count: node_tasks,
241                            reason: "Round-robin".to_string(),
242                        });
243                    }
244                }
245            }
246            _ => {
247                // Default: local only
248                distribution.local_tasks = task_count;
249            }
250        }
251
252        distribution
253    }
254
255    /// Get node statistics
256    pub fn node_stats(&self, node_id: &str) -> Option<EdgeNode> {
257        let nodes = self.nodes.lock().unwrap();
258        nodes.get(node_id).cloned()
259    }
260
261    /// Export edge topology as JSON
262    pub fn to_json(&self) -> String {
263        let nodes = self.nodes.lock().unwrap();
264        let node_list: Vec<String> = nodes.values()
265            .map(|n| format!(
266                r#"    {{
267      "node_id": "{}",
268      "region": "{}",
269      "healthy": {},
270      "latency_ms": {},
271      "load": {:.2},
272      "available_threads": {}
273    }}"#,
274                n.node_id,
275                n.region,
276                n.is_healthy,
277                n.latency_ms,
278                n.current_load,
279                n.available_threads
280            ))
281            .collect();
282
283        format!(
284            r#"{{
285  "local_node": "{}",
286  "node_count": {},
287  "nodes": [
288{}
289  ]
290}}"#,
291            self.local_node_id,
292            nodes.len(),
293            node_list.join(",\n")
294        )
295    }
296
297    fn adaptive_score(&self, node: &EdgeNode) -> f64 {
298        // Lower score is better
299        let latency_factor = node.latency_ms as f64 / 100.0;
300        let load_factor = node.current_load * 10.0;
301        let response_factor = node.avg_response_time.as_millis() as f64 / 10.0;
302
303        latency_factor + load_factor + response_factor
304    }
305}
306
307impl std::fmt::Display for EdgeNode {
308    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309        write!(
310            f,
311            "EdgeNode[id={}, region={}, latency={}ms, load={:.1}%, healthy={}]",
312            self.node_id,
313            self.region,
314            self.latency_ms,
315            self.current_load * 100.0,
316            self.is_healthy
317        )
318    }
319}
320
321impl std::fmt::Display for TaskDistribution {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        write!(
324            f,
325            "Distribution[local={}, remote={}]",
326            self.local_tasks,
327            self.remote_tasks.len()
328        )
329    }
330}
331
332
333
334
335