1use crate::persistence::Persistence;
2use anyhow::Result;
3use axum::{
5 extract::{Json, Path, State},
6 http::StatusCode,
7 response::IntoResponse,
8};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct MeshInstance {
18 pub instance_id: String,
19 pub hostname: String,
20 pub port: u16,
21 pub capabilities: Vec<String>,
22 pub is_leader: bool,
23 pub last_heartbeat: DateTime<Utc>,
24 pub created_at: DateTime<Utc>,
25 pub agent_profiles: Vec<String>,
26}
27
28#[derive(Debug, Serialize, Deserialize)]
30pub struct RegisterRequest {
31 pub instance_id: String,
32 pub hostname: String,
33 pub port: u16,
34 pub capabilities: Vec<String>,
35 pub agent_profiles: Vec<String>,
36}
37
38#[derive(Debug, Serialize, Deserialize)]
40pub struct RegisterResponse {
41 pub success: bool,
42 pub instance_id: String,
43 pub is_leader: bool,
44 pub leader_id: Option<String>,
45 pub peers: Vec<MeshInstance>,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
50pub struct InstancesResponse {
51 pub instances: Vec<MeshInstance>,
52 pub leader_id: Option<String>,
53}
54
55#[derive(Debug, Serialize, Deserialize)]
57pub struct HeartbeatRequest {
58 pub status: String,
59 pub metrics: Option<HashMap<String, serde_json::Value>>,
60}
61
62#[derive(Debug, Serialize, Deserialize)]
64pub struct HeartbeatResponse {
65 pub acknowledged: bool,
66 pub leader_id: Option<String>,
67 pub should_sync: bool,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72pub enum MessageType {
73 Query, Response, Notification, TaskDelegation, TaskResult, GraphSync, Custom(String), }
81
82impl MessageType {
83 pub fn as_str(&self) -> String {
84 match self {
85 MessageType::Query => "query".to_string(),
86 MessageType::Response => "response".to_string(),
87 MessageType::Notification => "notification".to_string(),
88 MessageType::TaskDelegation => "task_delegation".to_string(),
89 MessageType::TaskResult => "task_result".to_string(),
90 MessageType::GraphSync => "graph_sync".to_string(),
91 MessageType::Custom(s) => s.clone(),
92 }
93 }
94
95 pub fn from_str(s: &str) -> Self {
96 match s.to_lowercase().as_str() {
97 "query" => MessageType::Query,
98 "response" => MessageType::Response,
99 "notification" => MessageType::Notification,
100 "task_delegation" => MessageType::TaskDelegation,
101 "task_result" => MessageType::TaskResult,
102 "graph_sync" => MessageType::GraphSync,
103 custom => MessageType::Custom(custom.to_string()),
104 }
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct AgentMessage {
111 pub message_id: String,
112 pub source_instance: String,
113 pub target_instance: Option<String>, pub message_type: MessageType,
115 pub payload: serde_json::Value,
116 pub correlation_id: Option<String>, pub created_at: DateTime<Utc>,
118}
119
120#[derive(Debug, Serialize, Deserialize)]
122pub struct SendMessageRequest {
123 pub target_instance: Option<String>, pub message_type: MessageType,
125 pub payload: serde_json::Value,
126 pub correlation_id: Option<String>,
127}
128
129#[derive(Debug, Serialize, Deserialize)]
131pub struct SendMessageResponse {
132 pub message_id: String,
133 pub status: String,
134 pub delivered_to: Vec<String>,
135}
136
137#[derive(Debug, Serialize, Deserialize)]
139pub struct PendingMessagesResponse {
140 pub messages: Vec<AgentMessage>,
141}
142
143#[derive(Clone)]
145pub struct MeshRegistry {
146 instances: Arc<RwLock<HashMap<String, MeshInstance>>>,
147 leader_id: Arc<RwLock<Option<String>>>,
148 message_queue: Arc<RwLock<Vec<AgentMessage>>>,
149 persistence: Option<Persistence>,
150}
151
152impl MeshRegistry {
153 pub fn new() -> Self {
154 Self {
155 instances: Arc::new(RwLock::new(HashMap::new())),
156 leader_id: Arc::new(RwLock::new(None)),
157 message_queue: Arc::new(RwLock::new(Vec::new())),
158 persistence: None,
159 }
160 }
161
162 pub fn with_persistence(persistence: Persistence) -> Self {
163 Self {
164 instances: Arc::new(RwLock::new(HashMap::new())),
165 leader_id: Arc::new(RwLock::new(None)),
166 message_queue: Arc::new(RwLock::new(Vec::new())),
167 persistence: Some(persistence),
168 }
169 }
170
171 pub async fn register(&self, instance: MeshInstance) -> RegisterResponse {
173 let mut instances = self.instances.write().await;
174 let mut leader = self.leader_id.write().await;
175
176 let is_leader = instances.is_empty();
178 let mut new_instance = instance.clone();
179 new_instance.is_leader = is_leader;
180
181 if is_leader {
182 *leader = Some(instance.instance_id.clone());
183 }
184
185 instances.insert(instance.instance_id.clone(), new_instance);
186
187 RegisterResponse {
188 success: true,
189 instance_id: instance.instance_id.clone(),
190 is_leader,
191 leader_id: leader.clone(),
192 peers: instances.values().cloned().collect(),
193 }
194 }
195
196 pub async fn heartbeat(&self, instance_id: &str) -> HeartbeatResponse {
198 let mut instances = self.instances.write().await;
199 let leader = self.leader_id.read().await;
200
201 if let Some(instance) = instances.get_mut(instance_id) {
202 instance.last_heartbeat = Utc::now();
203 HeartbeatResponse {
204 acknowledged: true,
205 leader_id: leader.clone(),
206 should_sync: false,
207 }
208 } else {
209 HeartbeatResponse {
210 acknowledged: false,
211 leader_id: leader.clone(),
212 should_sync: false,
213 }
214 }
215 }
216
217 pub async fn deregister(&self, instance_id: &str) -> bool {
219 let mut instances = self.instances.write().await;
220 let mut leader = self.leader_id.write().await;
221
222 if let Some(instance) = instances.remove(instance_id) {
223 if instance.is_leader && !instances.is_empty() {
225 if let Some((new_leader_id, new_leader)) = instances.iter_mut().next() {
227 new_leader.is_leader = true;
228 *leader = Some(new_leader_id.clone());
229 }
230 } else if instances.is_empty() {
231 *leader = None;
232 }
233 true
234 } else {
235 false
236 }
237 }
238
239 pub async fn list(&self) -> Vec<MeshInstance> {
241 let instances = self.instances.read().await;
242 instances.values().cloned().collect()
243 }
244
245 pub async fn cleanup_stale(&self, timeout_secs: u64) {
247 let now = Utc::now();
248 let mut instances = self.instances.write().await;
249 let mut leader = self.leader_id.write().await;
250
251 let stale_ids: Vec<String> = instances
252 .iter()
253 .filter_map(|(id, instance)| {
254 let elapsed = now.timestamp() - instance.last_heartbeat.timestamp();
255 if elapsed > timeout_secs as i64 {
256 Some(id.clone())
257 } else {
258 None
259 }
260 })
261 .collect();
262
263 for id in stale_ids {
264 if let Some(instance) = instances.remove(&id) {
265 if instance.is_leader && !instances.is_empty() {
267 if let Some((new_leader_id, new_leader)) = instances.iter_mut().next() {
268 new_leader.is_leader = true;
269 *leader = Some(new_leader_id.clone());
270 }
271 }
272 }
273 }
274
275 if instances.is_empty() {
276 *leader = None;
277 }
278 }
279
280 pub async fn get_leader(&self) -> Option<String> {
282 let leader = self.leader_id.read().await;
283 leader.clone()
284 }
285
286 pub async fn send_message(
288 &self,
289 source_instance: String,
290 target_instance: Option<String>,
291 message_type: MessageType,
292 payload: serde_json::Value,
293 correlation_id: Option<String>,
294 ) -> Result<SendMessageResponse> {
295 let message_id = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)).to_string();
297
298 let message = AgentMessage {
299 message_id: message_id.clone(),
300 source_instance,
301 target_instance: target_instance.clone(),
302 message_type,
303 payload,
304 correlation_id,
305 created_at: Utc::now(),
306 };
307
308 if let Some(ref persistence) = self.persistence {
310 let target_str = target_instance.as_deref();
311 if let Err(e) = persistence.mesh_message_store(
312 &message_id,
313 &message.source_instance,
314 target_str,
315 &message.message_type.as_str(),
316 &message.payload,
317 "pending",
318 ) {
319 tracing::warn!("Failed to persist mesh message: {}", e);
320 }
321 }
322
323 let mut queue = self.message_queue.write().await;
325 queue.push(message.clone());
326
327 let delivered_to = if let Some(ref target) = target_instance {
332 let instances = self.instances.read().await;
333 if instances.contains_key(target) {
334 vec![target.clone()]
335 } else {
336 return Err(anyhow::anyhow!("Target instance '{}' not found", target));
337 }
338 } else {
339 let instances = self.instances.read().await;
341 instances.keys().cloned().collect()
342 };
343
344 Ok(SendMessageResponse {
345 message_id,
346 status: "queued".to_string(),
347 delivered_to,
348 })
349 }
350
351 pub async fn get_pending_messages(&self, instance_id: &str) -> Vec<AgentMessage> {
353 let queue = self.message_queue.read().await;
354 queue
355 .iter()
356 .filter(|msg| {
357 msg.target_instance.as_deref() == Some(instance_id) || msg.target_instance.is_none()
359 })
360 .cloned()
361 .collect()
362 }
363
364 pub async fn acknowledge_messages(&self, message_ids: Vec<String>) {
366 let mut queue = self.message_queue.write().await;
367 queue.retain(|msg| !message_ids.contains(&msg.message_id));
368 }
369}
370
371#[derive(Clone)]
373pub struct MeshClient {
374 base_url: String,
375 client: reqwest::Client,
376}
377
378impl MeshClient {
379 pub fn new(host: &str, port: u16) -> Self {
380 Self {
381 base_url: format!("http://{}:{}", host, port),
382 client: reqwest::Client::new(),
383 }
384 }
385
386 pub fn generate_instance_id() -> String {
388 let hostname = hostname::get()
389 .ok()
390 .and_then(|h| h.into_string().ok())
391 .unwrap_or_else(|| "unknown".to_string());
392 let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
394 format!("{}-{}", hostname, uuid)
395 }
396
397 pub async fn register(
399 &self,
400 instance_id: String,
401 hostname: String,
402 port: u16,
403 capabilities: Vec<String>,
404 agent_profiles: Vec<String>,
405 ) -> Result<RegisterResponse> {
406 let request = RegisterRequest {
407 instance_id,
408 hostname,
409 port,
410 capabilities,
411 agent_profiles,
412 };
413
414 let response = self
415 .client
416 .post(format!("{}/registry/register", self.base_url))
417 .json(&request)
418 .send()
419 .await?;
420
421 if response.status().is_success() {
422 Ok(response.json().await?)
423 } else {
424 anyhow::bail!("Registration failed: {}", response.status())
425 }
426 }
427
428 pub async fn heartbeat(
430 &self,
431 instance_id: &str,
432 metrics: Option<HashMap<String, serde_json::Value>>,
433 ) -> Result<HeartbeatResponse> {
434 let request = HeartbeatRequest {
435 status: "healthy".to_string(),
436 metrics,
437 };
438
439 let response = self
440 .client
441 .post(format!(
442 "{}/registry/heartbeat/{}",
443 self.base_url, instance_id
444 ))
445 .json(&request)
446 .send()
447 .await?;
448
449 if response.status().is_success() {
450 Ok(response.json().await?)
451 } else {
452 anyhow::bail!("Heartbeat failed: {}", response.status())
453 }
454 }
455
456 pub async fn list_instances(&self) -> Result<InstancesResponse> {
458 let response = self
459 .client
460 .get(format!("{}/registry/agents", self.base_url))
461 .send()
462 .await?;
463
464 if response.status().is_success() {
465 Ok(response.json().await?)
466 } else {
467 anyhow::bail!("Failed to list instances: {}", response.status())
468 }
469 }
470
471 pub async fn deregister(&self, instance_id: &str) -> Result<()> {
473 let response = self
474 .client
475 .delete(format!(
476 "{}/registry/deregister/{}",
477 self.base_url, instance_id
478 ))
479 .send()
480 .await?;
481
482 if response.status().is_success() {
483 Ok(())
484 } else {
485 anyhow::bail!("Deregistration failed: {}", response.status())
486 }
487 }
488
489 pub async fn send_message(
491 &self,
492 source_instance: String,
493 target_instance: Option<String>,
494 message_type: MessageType,
495 payload: serde_json::Value,
496 correlation_id: Option<String>,
497 ) -> Result<SendMessageResponse> {
498 let request = SendMessageRequest {
499 target_instance,
500 message_type,
501 payload,
502 correlation_id,
503 };
504
505 let response = self
506 .client
507 .post(format!(
508 "{}/messages/send/{}",
509 self.base_url, source_instance
510 ))
511 .json(&request)
512 .send()
513 .await?;
514
515 if response.status().is_success() {
516 Ok(response.json().await?)
517 } else {
518 anyhow::bail!("Send message failed: {}", response.status())
519 }
520 }
521
522 pub async fn get_messages(&self, instance_id: &str) -> Result<PendingMessagesResponse> {
524 let response = self
525 .client
526 .get(format!("{}/messages/{}", self.base_url, instance_id))
527 .send()
528 .await?;
529
530 if response.status().is_success() {
531 Ok(response.json().await?)
532 } else {
533 anyhow::bail!("Get messages failed: {}", response.status())
534 }
535 }
536
537 pub async fn acknowledge_messages(
539 &self,
540 instance_id: &str,
541 message_ids: Vec<String>,
542 ) -> Result<()> {
543 let request = AcknowledgeMessagesRequest { message_ids };
544
545 let response = self
546 .client
547 .post(format!("{}/messages/ack/{}", self.base_url, instance_id))
548 .json(&request)
549 .send()
550 .await?;
551
552 if response.status().is_success() {
553 Ok(())
554 } else {
555 anyhow::bail!("Acknowledge failed: {}", response.status())
556 }
557 }
558}
559
560pub trait MeshState {
562 fn mesh_registry(&self) -> &MeshRegistry;
563}
564
565pub async fn register_instance<S: MeshState>(
567 State(state): State<S>,
568 Json(request): Json<RegisterRequest>,
569) -> impl IntoResponse {
570 let instance = MeshInstance {
571 instance_id: request.instance_id,
572 hostname: request.hostname,
573 port: request.port,
574 capabilities: request.capabilities,
575 is_leader: false, last_heartbeat: Utc::now(),
577 created_at: Utc::now(),
578 agent_profiles: request.agent_profiles,
579 };
580
581 let response = state.mesh_registry().register(instance).await;
582 (StatusCode::OK, Json(response))
583}
584
585pub async fn list_instances<S: MeshState>(State(state): State<S>) -> impl IntoResponse {
587 let instances = state.mesh_registry().list().await;
588 let leader_id = instances
589 .iter()
590 .find(|i| i.is_leader)
591 .map(|i| i.instance_id.clone());
592
593 Json(InstancesResponse {
594 instances,
595 leader_id,
596 })
597}
598
599pub async fn heartbeat<S: MeshState>(
601 State(state): State<S>,
602 Path(instance_id): Path<String>,
603 Json(_request): Json<HeartbeatRequest>,
604) -> impl IntoResponse {
605 let response = state.mesh_registry().heartbeat(&instance_id).await;
606
607 if response.acknowledged {
608 (StatusCode::OK, Json(response))
609 } else {
610 (StatusCode::NOT_FOUND, Json(response))
611 }
612}
613
614pub async fn deregister_instance<S: MeshState>(
616 State(state): State<S>,
617 Path(instance_id): Path<String>,
618) -> impl IntoResponse {
619 let removed = state.mesh_registry().deregister(&instance_id).await;
620
621 if removed {
622 StatusCode::NO_CONTENT
623 } else {
624 StatusCode::NOT_FOUND
625 }
626}
627
628pub async fn send_message<S: MeshState>(
630 State(state): State<S>,
631 Path(source_instance): Path<String>,
632 Json(request): Json<SendMessageRequest>,
633) -> impl IntoResponse {
634 match state
635 .mesh_registry()
636 .send_message(
637 source_instance,
638 request.target_instance,
639 request.message_type,
640 request.payload,
641 request.correlation_id,
642 )
643 .await
644 {
645 Ok(response) => (StatusCode::OK, Json(response)).into_response(),
646 Err(e) => (
647 StatusCode::BAD_REQUEST,
648 Json(serde_json::json!({
649 "error": e.to_string()
650 })),
651 )
652 .into_response(),
653 }
654}
655
656pub async fn get_messages<S: MeshState>(
658 State(state): State<S>,
659 Path(instance_id): Path<String>,
660) -> impl IntoResponse {
661 let messages = state
662 .mesh_registry()
663 .get_pending_messages(&instance_id)
664 .await;
665
666 Json(PendingMessagesResponse { messages })
667}
668
669#[derive(Debug, Serialize, Deserialize)]
671pub struct AcknowledgeMessagesRequest {
672 pub message_ids: Vec<String>,
673}
674
675pub async fn acknowledge_messages<S: MeshState>(
677 State(state): State<S>,
678 Path(_instance_id): Path<String>,
679 Json(request): Json<AcknowledgeMessagesRequest>,
680) -> impl IntoResponse {
681 state
682 .mesh_registry()
683 .acknowledge_messages(request.message_ids)
684 .await;
685
686 StatusCode::NO_CONTENT
687}