1use std::sync::Arc;
7use std::collections::HashMap;
8use async_trait::async_trait;
9use dashmap::DashMap;
10use parking_lot::RwLock;
11use serde::{Serialize, Deserialize};
12use thiserror::Error;
13use uuid::Uuid;
14use chrono::{DateTime, Utc};
15use synaptic_qudag_core::{QuDAGNode, QuDAGNetwork};
16
17#[derive(Error, Debug)]
19pub enum MeshError {
20 #[error("Agent error: {0}")]
21 AgentError(String),
22
23 #[error("Coordination error: {0}")]
24 CoordinationError(String),
25
26 #[error("Network error: {0}")]
27 NetworkError(String),
28
29 #[error("Synchronization error: {0}")]
30 SynchronizationError(String),
31}
32
33pub type Result<T> = std::result::Result<T, MeshError>;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct Capabilities {
38 pub compute_power: f64,
39 pub memory_available: usize,
40 pub specializations: Vec<String>,
41 pub latency_ms: f64,
42}
43
44impl Default for Capabilities {
45 fn default() -> Self {
46 Self {
47 compute_power: 1.0,
48 memory_available: 1024 * 1024 * 1024, specializations: vec!["general".to_string()],
50 latency_ms: 10.0,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct Agent {
58 pub id: Uuid,
59 pub name: String,
60 pub capabilities: Capabilities,
61 pub status: AgentStatus,
62 pub created_at: DateTime<Utc>,
63 pub last_heartbeat: DateTime<Utc>,
64}
65
66#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
68pub enum AgentStatus {
69 Active,
70 Idle,
71 Busy,
72 Offline,
73}
74
75impl Agent {
76 pub fn new(name: impl Into<String>) -> Self {
78 let now = Utc::now();
79 Self {
80 id: Uuid::new_v4(),
81 name: name.into(),
82 capabilities: Capabilities::default(),
83 status: AgentStatus::Idle,
84 created_at: now,
85 last_heartbeat: now,
86 }
87 }
88
89 pub fn heartbeat(&mut self) {
91 self.last_heartbeat = Utc::now();
92 }
93
94 pub fn is_healthy(&self) -> bool {
96 let elapsed = Utc::now().signed_duration_since(self.last_heartbeat);
97 elapsed.num_seconds() < 30 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct Task {
104 pub id: Uuid,
105 pub name: String,
106 pub requirements: TaskRequirements,
107 pub status: TaskStatus,
108 pub assigned_agent: Option<Uuid>,
109 pub created_at: DateTime<Utc>,
110 pub completed_at: Option<DateTime<Utc>>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct TaskRequirements {
116 pub min_compute_power: f64,
117 pub min_memory: usize,
118 pub required_specializations: Vec<String>,
119 pub max_latency_ms: f64,
120}
121
122#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
124pub enum TaskStatus {
125 Pending,
126 Assigned,
127 InProgress,
128 Completed,
129 Failed,
130}
131
132pub struct NeuralMesh {
134 agents: Arc<DashMap<Uuid, Agent>>,
135 tasks: Arc<DashMap<Uuid, Task>>,
136 dag_network: Arc<QuDAGNetwork>,
137 topology: Arc<RwLock<MeshTopology>>,
138}
139
140#[derive(Debug, Clone)]
142pub struct MeshTopology {
143 connections: HashMap<Uuid, Vec<Uuid>>,
144}
145
146impl NeuralMesh {
147 pub fn new() -> Self {
149 Self {
150 agents: Arc::new(DashMap::new()),
151 tasks: Arc::new(DashMap::new()),
152 dag_network: Arc::new(QuDAGNetwork::new()),
153 topology: Arc::new(RwLock::new(MeshTopology {
154 connections: HashMap::new(),
155 })),
156 }
157 }
158
159 pub async fn add_agent(&self, agent: Agent) -> Result<Uuid> {
161 let agent_id = agent.id;
162
163 let node_data = serde_json::to_vec(&agent)
165 .map_err(|e| MeshError::AgentError(format!("Serialization failed: {}", e)))?;
166 let dag_node = QuDAGNode::new(&node_data);
167
168 self.dag_network.add_node(dag_node).await
169 .map_err(|e| MeshError::NetworkError(e.to_string()))?;
170
171 self.agents.insert(agent_id, agent);
172
173 self.update_topology(agent_id).await?;
175
176 Ok(agent_id)
177 }
178
179 pub async fn remove_agent(&self, agent_id: &Uuid) -> Result<()> {
181 self.agents.remove(agent_id);
182
183 let mut topology = self.topology.write();
185 topology.connections.remove(agent_id);
186 for connections in topology.connections.values_mut() {
187 connections.retain(|id| id != agent_id);
188 }
189
190 Ok(())
191 }
192
193 pub fn get_agent(&self, agent_id: &Uuid) -> Option<Agent> {
195 self.agents.get(agent_id).map(|a| a.clone())
196 }
197
198 pub fn list_agents(&self) -> Vec<Agent> {
200 self.agents.iter().map(|a| a.clone()).collect()
201 }
202
203 pub async fn submit_task(&self, name: impl Into<String>, requirements: TaskRequirements) -> Result<Uuid> {
205 let task = Task {
206 id: Uuid::new_v4(),
207 name: name.into(),
208 requirements,
209 status: TaskStatus::Pending,
210 assigned_agent: None,
211 created_at: Utc::now(),
212 completed_at: None,
213 };
214
215 let task_id = task.id;
216 self.tasks.insert(task_id, task);
217
218 self.assign_task(task_id).await?;
220
221 Ok(task_id)
222 }
223
224 async fn assign_task(&self, task_id: Uuid) -> Result<()> {
226 let task = self.tasks.get(&task_id)
227 .ok_or_else(|| MeshError::CoordinationError("Task not found".to_string()))?;
228
229 let suitable_agent = self.find_suitable_agent(&task.requirements);
231
232 if let Some(agent_id) = suitable_agent {
233 drop(task); if let Some(mut task) = self.tasks.get_mut(&task_id) {
237 task.assigned_agent = Some(agent_id);
238 task.status = TaskStatus::Assigned;
239 }
240
241 if let Some(mut agent) = self.agents.get_mut(&agent_id) {
243 agent.status = AgentStatus::Busy;
244 }
245 }
246
247 Ok(())
248 }
249
250 fn find_suitable_agent(&self, requirements: &TaskRequirements) -> Option<Uuid> {
252 self.agents.iter()
253 .filter(|entry| {
254 let agent = entry.value();
255 agent.status == AgentStatus::Idle &&
256 agent.is_healthy() &&
257 agent.capabilities.compute_power >= requirements.min_compute_power &&
258 agent.capabilities.memory_available >= requirements.min_memory &&
259 agent.capabilities.latency_ms <= requirements.max_latency_ms &&
260 requirements.required_specializations.iter().all(|spec| {
261 agent.capabilities.specializations.contains(spec)
262 })
263 })
264 .min_by_key(|entry| {
265 (entry.value().capabilities.latency_ms * 1000.0) as i64
267 })
268 .map(|entry| entry.key().clone())
269 }
270
271 async fn update_topology(&self, new_agent_id: Uuid) -> Result<()> {
273 let mut topology = self.topology.write();
274
275 let agents: Vec<Uuid> = self.agents.iter()
277 .map(|entry| entry.key().clone())
278 .filter(|id| id != &new_agent_id)
279 .take(3)
280 .collect();
281
282 topology.connections.insert(new_agent_id, agents.clone());
283
284 for agent_id in agents {
286 topology.connections.entry(agent_id)
287 .or_insert_with(Vec::new)
288 .push(new_agent_id);
289 }
290
291 Ok(())
292 }
293
294 pub fn get_stats(&self) -> MeshStats {
296 let total_agents = self.agents.len();
297 let active_agents = self.agents.iter()
298 .filter(|a| a.status == AgentStatus::Active || a.status == AgentStatus::Busy)
299 .count();
300 let total_tasks = self.tasks.len();
301 let completed_tasks = self.tasks.iter()
302 .filter(|t| t.status == TaskStatus::Completed)
303 .count();
304
305 MeshStats {
306 total_agents,
307 active_agents,
308 total_tasks,
309 completed_tasks,
310 dag_nodes: self.dag_network.node_count(),
311 }
312 }
313}
314
315impl Default for NeuralMesh {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize)]
323pub struct MeshStats {
324 pub total_agents: usize,
325 pub active_agents: usize,
326 pub total_tasks: usize,
327 pub completed_tasks: usize,
328 pub dag_nodes: usize,
329}
330
331#[async_trait]
333pub trait CoordinationProtocol {
334 async fn negotiate_task(&self, task: &Task, agents: &[Agent]) -> Result<Option<Uuid>>;
336
337 async fn synchronize(&self) -> Result<()>;
339
340 async fn handle_failure(&self, agent_id: Uuid) -> Result<()>;
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347
348 #[tokio::test]
349 async fn test_mesh_creation() {
350 let mesh = NeuralMesh::new();
351 let stats = mesh.get_stats();
352 assert_eq!(stats.total_agents, 0);
353 assert_eq!(stats.total_tasks, 0);
354 }
355
356 #[tokio::test]
357 async fn test_agent_management() {
358 let mesh = NeuralMesh::new();
359
360 let agent = Agent::new("test-agent");
361 let agent_id = mesh.add_agent(agent.clone()).await.unwrap();
362
363 assert_eq!(mesh.get_stats().total_agents, 1);
364
365 let retrieved = mesh.get_agent(&agent_id).unwrap();
366 assert_eq!(retrieved.name, "test-agent");
367
368 mesh.remove_agent(&agent_id).await.unwrap();
369 assert_eq!(mesh.get_stats().total_agents, 0);
370 }
371
372 #[tokio::test]
373 async fn test_task_submission() {
374 let mesh = NeuralMesh::new();
375
376 let mut agent = Agent::new("worker");
378 agent.capabilities.compute_power = 2.0;
379 mesh.add_agent(agent).await.unwrap();
380
381 let requirements = TaskRequirements {
383 min_compute_power: 1.0,
384 min_memory: 1024,
385 required_specializations: vec!["general".to_string()],
386 max_latency_ms: 100.0,
387 };
388
389 let task_id = mesh.submit_task("test-task", requirements).await.unwrap();
390
391 assert_eq!(mesh.get_stats().total_tasks, 1);
392
393 let task = mesh.tasks.get(&task_id).unwrap();
395 assert!(task.assigned_agent.is_some());
396 assert_eq!(task.status, TaskStatus::Assigned);
397 }
398}