Skip to main content

spec_ai/spec_ai_core/
mesh.rs

1//! Shared mesh protocol types and client helpers.
2use anyhow::Result;
3use chrono::{DateTime, Utc};
4use hostname::get as get_hostname;
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::collections::HashMap;
9use uuid::{NoContext, Timestamp, Uuid};
10
11/// Agent instance information in the mesh
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MeshInstance {
14    pub instance_id: String,
15    pub hostname: String,
16    pub port: u16,
17    pub capabilities: Vec<String>,
18    pub is_leader: bool,
19    pub last_heartbeat: DateTime<Utc>,
20    pub created_at: DateTime<Utc>,
21    pub agent_profiles: Vec<String>,
22}
23
24/// Request to register a new instance
25#[derive(Debug, Serialize, Deserialize)]
26pub struct RegisterRequest {
27    pub instance_id: String,
28    pub hostname: String,
29    pub port: u16,
30    pub capabilities: Vec<String>,
31    pub agent_profiles: Vec<String>,
32}
33
34/// Response from registration
35#[derive(Debug, Serialize, Deserialize)]
36pub struct RegisterResponse {
37    pub success: bool,
38    pub instance_id: String,
39    pub is_leader: bool,
40    pub leader_id: Option<String>,
41    pub peers: Vec<MeshInstance>,
42}
43
44/// List of registered instances
45#[derive(Debug, Serialize, Deserialize)]
46pub struct InstancesResponse {
47    pub instances: Vec<MeshInstance>,
48    pub leader_id: Option<String>,
49}
50
51/// Heartbeat request
52#[derive(Debug, Serialize, Deserialize)]
53pub struct HeartbeatRequest {
54    pub status: String,
55    pub metrics: Option<HashMap<String, serde_json::Value>>,
56}
57
58/// Heartbeat response
59#[derive(Debug, Serialize, Deserialize)]
60pub struct HeartbeatResponse {
61    pub acknowledged: bool,
62    pub leader_id: Option<String>,
63    pub should_sync: bool,
64}
65
66/// Message types for inter-agent communication
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
68pub enum MessageType {
69    Query,
70    Response,
71    Notification,
72    TaskDelegation,
73    TaskResult,
74    GraphSync,
75    // Collective Intelligence message types
76    CapabilityUpdate,    // Share capability/expertise profile updates
77    CapabilityQuery,     // Request capability information from peers
78    LearningShare,       // Share a learned strategy with the mesh
79    ProposalSubmit,      // Submit a proposal for collective decision
80    ProposalVote,        // Cast a vote on a proposal
81    WorkflowAssignment,  // Assign a workflow stage to an agent
82    WorkflowStageResult, // Report completion of a workflow stage
83    Custom(String),
84}
85
86impl MessageType {
87    pub fn as_str(&self) -> String {
88        match self {
89            MessageType::Query => "query".to_string(),
90            MessageType::Response => "response".to_string(),
91            MessageType::Notification => "notification".to_string(),
92            MessageType::TaskDelegation => "task_delegation".to_string(),
93            MessageType::TaskResult => "task_result".to_string(),
94            MessageType::GraphSync => "graph_sync".to_string(),
95            MessageType::CapabilityUpdate => "capability_update".to_string(),
96            MessageType::CapabilityQuery => "capability_query".to_string(),
97            MessageType::LearningShare => "learning_share".to_string(),
98            MessageType::ProposalSubmit => "proposal_submit".to_string(),
99            MessageType::ProposalVote => "proposal_vote".to_string(),
100            MessageType::WorkflowAssignment => "workflow_assignment".to_string(),
101            MessageType::WorkflowStageResult => "workflow_stage_result".to_string(),
102            MessageType::Custom(s) => s.clone(),
103        }
104    }
105
106    #[allow(clippy::should_implement_trait)]
107    pub fn from_str(s: &str) -> Self {
108        match s.to_lowercase().as_str() {
109            "query" => MessageType::Query,
110            "response" => MessageType::Response,
111            "notification" => MessageType::Notification,
112            "task_delegation" => MessageType::TaskDelegation,
113            "task_result" => MessageType::TaskResult,
114            "graph_sync" => MessageType::GraphSync,
115            "capability_update" => MessageType::CapabilityUpdate,
116            "capability_query" => MessageType::CapabilityQuery,
117            "learning_share" => MessageType::LearningShare,
118            "proposal_submit" => MessageType::ProposalSubmit,
119            "proposal_vote" => MessageType::ProposalVote,
120            "workflow_assignment" => MessageType::WorkflowAssignment,
121            "workflow_stage_result" => MessageType::WorkflowStageResult,
122            custom => MessageType::Custom(custom.to_string()),
123        }
124    }
125}
126
127/// Inter-agent message
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct AgentMessage {
130    pub message_id: String,
131    pub source_instance: String,
132    pub target_instance: Option<String>,
133    pub message_type: MessageType,
134    pub payload: serde_json::Value,
135    pub correlation_id: Option<String>,
136    pub created_at: DateTime<Utc>,
137}
138
139/// Message send request
140#[derive(Debug, Serialize, Deserialize)]
141pub struct SendMessageRequest {
142    pub target_instance: Option<String>,
143    pub message_type: MessageType,
144    pub payload: serde_json::Value,
145    pub correlation_id: Option<String>,
146}
147
148/// Message send response
149#[derive(Debug, Serialize, Deserialize)]
150pub struct SendMessageResponse {
151    pub message_id: String,
152    pub status: String,
153    pub delivered_to: Vec<String>,
154}
155
156/// Pending messages response
157#[derive(Debug, Serialize, Deserialize)]
158pub struct PendingMessagesResponse {
159    pub messages: Vec<AgentMessage>,
160}
161
162/// Client-side mesh operations
163#[derive(Clone)]
164pub struct MeshClient {
165    base_url: String,
166    client: Client,
167}
168
169impl MeshClient {
170    pub fn new(host: &str, port: u16) -> Self {
171        Self {
172            base_url: format!("http://{}:{}", host, port),
173            client: Client::new(),
174        }
175    }
176
177    /// Generate a unique instance ID
178    pub fn generate_instance_id() -> String {
179        let hostname = get_hostname()
180            .ok()
181            .and_then(|h| h.into_string().ok())
182            .unwrap_or_else(|| "unknown".to_string());
183        let uuid = Uuid::new_v7(Timestamp::now(NoContext));
184        format!("{}-{}", hostname, uuid)
185    }
186
187    /// Register this instance with a mesh registry
188    pub async fn register(
189        &self,
190        instance_id: String,
191        hostname: String,
192        port: u16,
193        capabilities: Vec<String>,
194        agent_profiles: Vec<String>,
195    ) -> Result<RegisterResponse> {
196        let request = RegisterRequest {
197            instance_id,
198            hostname,
199            port,
200            capabilities,
201            agent_profiles,
202        };
203
204        let response = self
205            .client
206            .post(format!("{}/registry/register", self.base_url))
207            .json(&request)
208            .send()
209            .await?;
210
211        if response.status().is_success() {
212            Ok(response.json().await?)
213        } else {
214            anyhow::bail!("Registration failed: {}", response.status())
215        }
216    }
217
218    /// Send heartbeat to registry
219    pub async fn heartbeat(
220        &self,
221        instance_id: &str,
222        metrics: Option<HashMap<String, serde_json::Value>>,
223    ) -> Result<HeartbeatResponse> {
224        let request = HeartbeatRequest {
225            status: "healthy".to_string(),
226            metrics,
227        };
228
229        let response = self
230            .client
231            .post(format!(
232                "{}/registry/heartbeat/{}",
233                self.base_url, instance_id
234            ))
235            .json(&request)
236            .send()
237            .await?;
238
239        if response.status().is_success() {
240            Ok(response.json().await?)
241        } else {
242            anyhow::bail!("Heartbeat failed: {}", response.status())
243        }
244    }
245
246    /// List all instances in the mesh
247    pub async fn list_instances(&self) -> Result<InstancesResponse> {
248        let response = self
249            .client
250            .get(format!("{}/registry/agents", self.base_url))
251            .send()
252            .await?;
253
254        if response.status().is_success() {
255            Ok(response.json().await?)
256        } else {
257            anyhow::bail!("Failed to list instances: {}", response.status())
258        }
259    }
260
261    /// Deregister from the mesh
262    pub async fn deregister(&self, instance_id: &str) -> Result<()> {
263        let response = self
264            .client
265            .delete(format!(
266                "{}/registry/deregister/{}",
267                self.base_url, instance_id
268            ))
269            .send()
270            .await?;
271
272        if response.status().is_success() {
273            Ok(())
274        } else {
275            anyhow::bail!("Deregistration failed: {}", response.status())
276        }
277    }
278
279    /// Send a message to another instance
280    pub async fn send_message(
281        &self,
282        source_instance: String,
283        target_instance: Option<String>,
284        message_type: MessageType,
285        payload: serde_json::Value,
286        correlation_id: Option<String>,
287    ) -> Result<SendMessageResponse> {
288        let request = SendMessageRequest {
289            target_instance,
290            message_type,
291            payload,
292            correlation_id,
293        };
294
295        let response = self
296            .client
297            .post(format!(
298                "{}/messages/send/{}",
299                self.base_url, source_instance
300            ))
301            .json(&request)
302            .send()
303            .await?;
304
305        if response.status().is_success() {
306            Ok(response.json().await?)
307        } else {
308            anyhow::bail!("Send message failed: {}", response.status())
309        }
310    }
311
312    /// Get pending messages for an instance
313    pub async fn get_messages(&self, instance_id: &str) -> Result<PendingMessagesResponse> {
314        let response = self
315            .client
316            .get(format!("{}/messages/{}", self.base_url, instance_id))
317            .send()
318            .await?;
319
320        if response.status().is_success() {
321            Ok(response.json().await?)
322        } else {
323            anyhow::bail!("Failed to get messages: {}", response.status())
324        }
325    }
326
327    /// Acknowledge delivered messages
328    pub async fn acknowledge_messages(
329        &self,
330        instance_id: &str,
331        message_ids: Vec<String>,
332    ) -> Result<()> {
333        let response = self
334            .client
335            .post(format!("{}/messages/{}/ack", self.base_url, instance_id))
336            .json(&json!({ "message_ids": message_ids }))
337            .send()
338            .await?;
339
340        if response.status().is_success() {
341            Ok(())
342        } else {
343            anyhow::bail!("Failed to acknowledge messages: {}", response.status())
344        }
345    }
346}