1use anyhow::Result;
3use chrono::{DateTime, Utc};
4use hostname::get as get_hostname;
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::collections::HashMap;
9use uuid::{NoContext, Timestamp, Uuid};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MeshInstance {
14 pub instance_id: String,
15 pub hostname: String,
16 pub port: u16,
17 pub capabilities: Vec<String>,
18 pub is_leader: bool,
19 pub last_heartbeat: DateTime<Utc>,
20 pub created_at: DateTime<Utc>,
21 pub agent_profiles: Vec<String>,
22}
23
24#[derive(Debug, Serialize, Deserialize)]
26pub struct RegisterRequest {
27 pub instance_id: String,
28 pub hostname: String,
29 pub port: u16,
30 pub capabilities: Vec<String>,
31 pub agent_profiles: Vec<String>,
32}
33
34#[derive(Debug, Serialize, Deserialize)]
36pub struct RegisterResponse {
37 pub success: bool,
38 pub instance_id: String,
39 pub is_leader: bool,
40 pub leader_id: Option<String>,
41 pub peers: Vec<MeshInstance>,
42}
43
44#[derive(Debug, Serialize, Deserialize)]
46pub struct InstancesResponse {
47 pub instances: Vec<MeshInstance>,
48 pub leader_id: Option<String>,
49}
50
51#[derive(Debug, Serialize, Deserialize)]
53pub struct HeartbeatRequest {
54 pub status: String,
55 pub metrics: Option<HashMap<String, serde_json::Value>>,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
60pub struct HeartbeatResponse {
61 pub acknowledged: bool,
62 pub leader_id: Option<String>,
63 pub should_sync: bool,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
68pub enum MessageType {
69 Query,
70 Response,
71 Notification,
72 TaskDelegation,
73 TaskResult,
74 GraphSync,
75 CapabilityUpdate, CapabilityQuery, LearningShare, ProposalSubmit, ProposalVote, WorkflowAssignment, WorkflowStageResult, Custom(String),
84}
85
86impl MessageType {
87 pub fn as_str(&self) -> String {
88 match self {
89 MessageType::Query => "query".to_string(),
90 MessageType::Response => "response".to_string(),
91 MessageType::Notification => "notification".to_string(),
92 MessageType::TaskDelegation => "task_delegation".to_string(),
93 MessageType::TaskResult => "task_result".to_string(),
94 MessageType::GraphSync => "graph_sync".to_string(),
95 MessageType::CapabilityUpdate => "capability_update".to_string(),
96 MessageType::CapabilityQuery => "capability_query".to_string(),
97 MessageType::LearningShare => "learning_share".to_string(),
98 MessageType::ProposalSubmit => "proposal_submit".to_string(),
99 MessageType::ProposalVote => "proposal_vote".to_string(),
100 MessageType::WorkflowAssignment => "workflow_assignment".to_string(),
101 MessageType::WorkflowStageResult => "workflow_stage_result".to_string(),
102 MessageType::Custom(s) => s.clone(),
103 }
104 }
105
106 #[allow(clippy::should_implement_trait)]
107 pub fn from_str(s: &str) -> Self {
108 match s.to_lowercase().as_str() {
109 "query" => MessageType::Query,
110 "response" => MessageType::Response,
111 "notification" => MessageType::Notification,
112 "task_delegation" => MessageType::TaskDelegation,
113 "task_result" => MessageType::TaskResult,
114 "graph_sync" => MessageType::GraphSync,
115 "capability_update" => MessageType::CapabilityUpdate,
116 "capability_query" => MessageType::CapabilityQuery,
117 "learning_share" => MessageType::LearningShare,
118 "proposal_submit" => MessageType::ProposalSubmit,
119 "proposal_vote" => MessageType::ProposalVote,
120 "workflow_assignment" => MessageType::WorkflowAssignment,
121 "workflow_stage_result" => MessageType::WorkflowStageResult,
122 custom => MessageType::Custom(custom.to_string()),
123 }
124 }
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct AgentMessage {
130 pub message_id: String,
131 pub source_instance: String,
132 pub target_instance: Option<String>,
133 pub message_type: MessageType,
134 pub payload: serde_json::Value,
135 pub correlation_id: Option<String>,
136 pub created_at: DateTime<Utc>,
137}
138
139#[derive(Debug, Serialize, Deserialize)]
141pub struct SendMessageRequest {
142 pub target_instance: Option<String>,
143 pub message_type: MessageType,
144 pub payload: serde_json::Value,
145 pub correlation_id: Option<String>,
146}
147
148#[derive(Debug, Serialize, Deserialize)]
150pub struct SendMessageResponse {
151 pub message_id: String,
152 pub status: String,
153 pub delivered_to: Vec<String>,
154}
155
156#[derive(Debug, Serialize, Deserialize)]
158pub struct PendingMessagesResponse {
159 pub messages: Vec<AgentMessage>,
160}
161
162#[derive(Clone)]
164pub struct MeshClient {
165 base_url: String,
166 client: Client,
167}
168
169impl MeshClient {
170 pub fn new(host: &str, port: u16) -> Self {
171 Self {
172 base_url: format!("http://{}:{}", host, port),
173 client: Client::new(),
174 }
175 }
176
177 pub fn generate_instance_id() -> String {
179 let hostname = get_hostname()
180 .ok()
181 .and_then(|h| h.into_string().ok())
182 .unwrap_or_else(|| "unknown".to_string());
183 let uuid = Uuid::new_v7(Timestamp::now(NoContext));
184 format!("{}-{}", hostname, uuid)
185 }
186
187 pub async fn register(
189 &self,
190 instance_id: String,
191 hostname: String,
192 port: u16,
193 capabilities: Vec<String>,
194 agent_profiles: Vec<String>,
195 ) -> Result<RegisterResponse> {
196 let request = RegisterRequest {
197 instance_id,
198 hostname,
199 port,
200 capabilities,
201 agent_profiles,
202 };
203
204 let response = self
205 .client
206 .post(format!("{}/registry/register", self.base_url))
207 .json(&request)
208 .send()
209 .await?;
210
211 if response.status().is_success() {
212 Ok(response.json().await?)
213 } else {
214 anyhow::bail!("Registration failed: {}", response.status())
215 }
216 }
217
218 pub async fn heartbeat(
220 &self,
221 instance_id: &str,
222 metrics: Option<HashMap<String, serde_json::Value>>,
223 ) -> Result<HeartbeatResponse> {
224 let request = HeartbeatRequest {
225 status: "healthy".to_string(),
226 metrics,
227 };
228
229 let response = self
230 .client
231 .post(format!(
232 "{}/registry/heartbeat/{}",
233 self.base_url, instance_id
234 ))
235 .json(&request)
236 .send()
237 .await?;
238
239 if response.status().is_success() {
240 Ok(response.json().await?)
241 } else {
242 anyhow::bail!("Heartbeat failed: {}", response.status())
243 }
244 }
245
246 pub async fn list_instances(&self) -> Result<InstancesResponse> {
248 let response = self
249 .client
250 .get(format!("{}/registry/agents", self.base_url))
251 .send()
252 .await?;
253
254 if response.status().is_success() {
255 Ok(response.json().await?)
256 } else {
257 anyhow::bail!("Failed to list instances: {}", response.status())
258 }
259 }
260
261 pub async fn deregister(&self, instance_id: &str) -> Result<()> {
263 let response = self
264 .client
265 .delete(format!(
266 "{}/registry/deregister/{}",
267 self.base_url, instance_id
268 ))
269 .send()
270 .await?;
271
272 if response.status().is_success() {
273 Ok(())
274 } else {
275 anyhow::bail!("Deregistration failed: {}", response.status())
276 }
277 }
278
279 pub async fn send_message(
281 &self,
282 source_instance: String,
283 target_instance: Option<String>,
284 message_type: MessageType,
285 payload: serde_json::Value,
286 correlation_id: Option<String>,
287 ) -> Result<SendMessageResponse> {
288 let request = SendMessageRequest {
289 target_instance,
290 message_type,
291 payload,
292 correlation_id,
293 };
294
295 let response = self
296 .client
297 .post(format!(
298 "{}/messages/send/{}",
299 self.base_url, source_instance
300 ))
301 .json(&request)
302 .send()
303 .await?;
304
305 if response.status().is_success() {
306 Ok(response.json().await?)
307 } else {
308 anyhow::bail!("Send message failed: {}", response.status())
309 }
310 }
311
312 pub async fn get_messages(&self, instance_id: &str) -> Result<PendingMessagesResponse> {
314 let response = self
315 .client
316 .get(format!("{}/messages/{}", self.base_url, instance_id))
317 .send()
318 .await?;
319
320 if response.status().is_success() {
321 Ok(response.json().await?)
322 } else {
323 anyhow::bail!("Failed to get messages: {}", response.status())
324 }
325 }
326
327 pub async fn acknowledge_messages(
329 &self,
330 instance_id: &str,
331 message_ids: Vec<String>,
332 ) -> Result<()> {
333 let response = self
334 .client
335 .post(format!("{}/messages/{}/ack", self.base_url, instance_id))
336 .json(&json!({ "message_ids": message_ids }))
337 .send()
338 .await?;
339
340 if response.status().is_success() {
341 Ok(())
342 } else {
343 anyhow::bail!("Failed to acknowledge messages: {}", response.status())
344 }
345 }
346}