1use crossbeam::channel::{bounded, Sender, Receiver};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
7use std::thread;
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone)]
12pub enum AgentMessage {
13 Compute { task_id: u64, data: Vec<f64> },
14 Result { task_id: u64, result: Vec<f64> },
15 Coordinate { from: usize, to: usize, payload: String },
16 Heartbeat { agent_id: usize, timestamp: u64 },
17}
18
19pub struct ComputeAgent {
21 pub id: usize,
22 pub agent_type: AgentType,
23 pub inbox: Receiver<AgentMessage>,
24 pub outbox: Vec<Sender<AgentMessage>>,
25 pub metrics: Arc<AgentMetrics>,
26}
27
28#[derive(Debug, Clone)]
29pub enum AgentType {
30 LinearAlgebra, Optimization, Quantum, Analysis, Coordinator, }
36
37pub struct AgentMetrics {
39 pub messages_processed: AtomicU64,
40 pub compute_time_us: AtomicU64,
41 pub is_active: AtomicBool,
42}
43
44impl ComputeAgent {
45 pub fn run(self) {
47 self.metrics.is_active.store(true, Ordering::SeqCst);
48
49 while self.metrics.is_active.load(Ordering::SeqCst) {
50 let deadline = Instant::now() + Duration::from_micros(25);
52
53 while let Ok(msg) = self.inbox.recv_timeout(Duration::from_micros(1)) {
54 let start = Instant::now();
55
56 match msg {
57 AgentMessage::Compute { task_id, data } => {
58 let result = self.process_computation(&data);
59
60 if let Some(coordinator) = self.outbox.get(0) {
62 let _ = coordinator.send(AgentMessage::Result {
63 task_id,
64 result,
65 });
66 }
67 }
68 AgentMessage::Coordinate { from, to, payload } => {
69 if let Some(dest) = self.outbox.get(to) {
71 let _ = dest.send(AgentMessage::Coordinate {
72 from,
73 to,
74 payload,
75 });
76 }
77 }
78 _ => {}
79 }
80
81 let elapsed = start.elapsed().as_micros() as u64;
82 self.metrics.compute_time_us.fetch_add(elapsed, Ordering::Relaxed);
83 self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
84
85 if Instant::now() >= deadline {
86 break; }
88 }
89 }
90 }
91
92 fn process_computation(&self, data: &[f64]) -> Vec<f64> {
94 match self.agent_type {
95 AgentType::LinearAlgebra => {
96 let n = (data.len() as f64).sqrt() as usize;
98 let mut result = vec![0.0; n];
99
100 for i in 0..n {
101 for j in 0..n {
102 result[i] += data[i * n + j];
103 }
104 }
105 result
106 }
107 AgentType::Optimization => {
108 data.iter()
110 .map(|&x| x - 0.01 * (2.0 * x - 1.0)) .collect()
112 }
113 AgentType::Quantum => {
114 data.iter()
116 .map(|&x| (x * std::f64::consts::PI).sin())
117 .collect()
118 }
119 AgentType::Analysis => {
120 let mean = data.iter().sum::<f64>() / data.len() as f64;
122 let variance = data.iter()
123 .map(|&x| (x - mean).powi(2))
124 .sum::<f64>() / data.len() as f64;
125 vec![mean, variance, variance.sqrt()]
126 }
127 AgentType::Coordinator => {
128 data.to_vec()
130 }
131 }
132 }
133}
134
135pub struct SwarmCoordinator {
137 pub agents: Vec<thread::JoinHandle<()>>,
138 pub channels: Vec<Sender<AgentMessage>>,
139 pub metrics: Vec<Arc<AgentMetrics>>,
140 pub topology: SwarmTopology,
141}
142
143#[derive(Debug, Clone)]
144pub enum SwarmTopology {
145 Mesh, Hierarchical, Ring, Star, }
150
151impl SwarmCoordinator {
152 pub fn new(agent_count: usize, topology: SwarmTopology) -> Self {
154 let mut channels = Vec::new();
155 let mut receivers = Vec::new();
156 let mut metrics = Vec::new();
157
158 for _ in 0..agent_count {
160 let (tx, rx) = bounded(1000);
161 channels.push(tx);
162 receivers.push(rx);
163 metrics.push(Arc::new(AgentMetrics {
164 messages_processed: AtomicU64::new(0),
165 compute_time_us: AtomicU64::new(0),
166 is_active: AtomicBool::new(false),
167 }));
168 }
169
170 let mut agents = Vec::new();
172
173 for i in 0..agent_count {
174 let agent_type = match i % 5 {
175 0 => AgentType::Coordinator,
176 1 => AgentType::LinearAlgebra,
177 2 => AgentType::Optimization,
178 3 => AgentType::Quantum,
179 _ => AgentType::Analysis,
180 };
181
182 let connections = match topology {
184 SwarmTopology::Mesh => {
185 channels.clone()
187 }
188 SwarmTopology::Star => {
189 if i == 0 {
191 channels.clone()
192 } else {
193 vec![channels[0].clone()]
194 }
195 }
196 SwarmTopology::Ring => {
197 vec![channels[(i + 1) % agent_count].clone()]
199 }
200 SwarmTopology::Hierarchical => {
201 let mut conns = vec![];
203 if i > 0 {
204 conns.push(channels[(i - 1) / 2].clone()); }
206 if 2 * i + 1 < agent_count {
207 conns.push(channels[2 * i + 1].clone()); }
209 if 2 * i + 2 < agent_count {
210 conns.push(channels[2 * i + 2].clone()); }
212 conns
213 }
214 };
215
216 let agent = ComputeAgent {
217 id: i,
218 agent_type,
219 inbox: receivers.pop().unwrap(),
220 outbox: connections,
221 metrics: metrics[i].clone(),
222 };
223
224 agents.push(thread::spawn(move || agent.run()));
225 }
226
227 SwarmCoordinator {
228 agents,
229 channels,
230 metrics,
231 topology,
232 }
233 }
234
235 pub fn submit_task(&self, task_id: u64, data: Vec<f64>) -> Result<(), String> {
237 for (i, channel) in self.channels.iter().enumerate() {
239 if i % 5 == 0 { channel.send(AgentMessage::Compute { task_id, data: data.clone() })
241 .map_err(|e| format!("Failed to submit task: {}", e))?;
242 }
243 }
244 Ok(())
245 }
246
247 pub fn get_metrics(&self) -> SwarmMetrics {
249 let mut total_messages = 0;
250 let mut total_compute_us = 0;
251 let mut active_agents = 0;
252
253 for metric in &self.metrics {
254 total_messages += metric.messages_processed.load(Ordering::Relaxed);
255 total_compute_us += metric.compute_time_us.load(Ordering::Relaxed);
256 if metric.is_active.load(Ordering::Relaxed) {
257 active_agents += 1;
258 }
259 }
260
261 SwarmMetrics {
262 total_messages,
263 total_compute_us,
264 active_agents,
265 agent_count: self.metrics.len(),
266 throughput: if total_compute_us > 0 {
267 (total_messages as f64 * 1_000_000.0) / total_compute_us as f64
268 } else {
269 0.0
270 },
271 }
272 }
273
274 pub fn shutdown(&self) {
276 for metric in &self.metrics {
277 metric.is_active.store(false, Ordering::SeqCst);
278 }
279 }
280}
281
282#[derive(Debug)]
283pub struct SwarmMetrics {
284 pub total_messages: u64,
285 pub total_compute_us: u64,
286 pub active_agents: usize,
287 pub agent_count: usize,
288 pub throughput: f64, }
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn test_real_swarm_creation() {
297 let swarm = SwarmCoordinator::new(10, SwarmTopology::Mesh);
298
299 thread::sleep(Duration::from_millis(10));
301
302 let metrics = swarm.get_metrics();
303 assert_eq!(metrics.agent_count, 10);
304 assert!(metrics.active_agents > 0);
305
306 swarm.shutdown();
307 }
308
309 #[test]
310 fn test_task_submission() {
311 let swarm = SwarmCoordinator::new(5, SwarmTopology::Star);
312
313 let data = vec![1.0, 2.0, 3.0, 4.0];
314 swarm.submit_task(1, data).unwrap();
315
316 thread::sleep(Duration::from_millis(50));
318
319 let metrics = swarm.get_metrics();
320 assert!(metrics.total_messages > 0);
321
322 swarm.shutdown();
323 }
324}