use anyhow::Result;
use chrono::{DateTime, Utc};
use hostname::get as get_hostname;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use uuid::{NoContext, Timestamp, Uuid};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeshInstance {
pub instance_id: String,
pub hostname: String,
pub port: u16,
pub capabilities: Vec<String>,
pub is_leader: bool,
pub last_heartbeat: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub agent_profiles: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RegisterRequest {
pub instance_id: String,
pub hostname: String,
pub port: u16,
pub capabilities: Vec<String>,
pub agent_profiles: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RegisterResponse {
pub success: bool,
pub instance_id: String,
pub is_leader: bool,
pub leader_id: Option<String>,
pub peers: Vec<MeshInstance>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InstancesResponse {
pub instances: Vec<MeshInstance>,
pub leader_id: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HeartbeatRequest {
pub status: String,
pub metrics: Option<HashMap<String, serde_json::Value>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HeartbeatResponse {
pub acknowledged: bool,
pub leader_id: Option<String>,
pub should_sync: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MessageType {
Query,
Response,
Notification,
TaskDelegation,
TaskResult,
GraphSync,
CapabilityUpdate, CapabilityQuery, LearningShare, ProposalSubmit, ProposalVote, WorkflowAssignment, WorkflowStageResult, Custom(String),
}
impl MessageType {
pub fn as_str(&self) -> String {
match self {
MessageType::Query => "query".to_string(),
MessageType::Response => "response".to_string(),
MessageType::Notification => "notification".to_string(),
MessageType::TaskDelegation => "task_delegation".to_string(),
MessageType::TaskResult => "task_result".to_string(),
MessageType::GraphSync => "graph_sync".to_string(),
MessageType::CapabilityUpdate => "capability_update".to_string(),
MessageType::CapabilityQuery => "capability_query".to_string(),
MessageType::LearningShare => "learning_share".to_string(),
MessageType::ProposalSubmit => "proposal_submit".to_string(),
MessageType::ProposalVote => "proposal_vote".to_string(),
MessageType::WorkflowAssignment => "workflow_assignment".to_string(),
MessageType::WorkflowStageResult => "workflow_stage_result".to_string(),
MessageType::Custom(s) => s.clone(),
}
}
#[allow(clippy::should_implement_trait)]
pub fn from_str(s: &str) -> Self {
match s.to_lowercase().as_str() {
"query" => MessageType::Query,
"response" => MessageType::Response,
"notification" => MessageType::Notification,
"task_delegation" => MessageType::TaskDelegation,
"task_result" => MessageType::TaskResult,
"graph_sync" => MessageType::GraphSync,
"capability_update" => MessageType::CapabilityUpdate,
"capability_query" => MessageType::CapabilityQuery,
"learning_share" => MessageType::LearningShare,
"proposal_submit" => MessageType::ProposalSubmit,
"proposal_vote" => MessageType::ProposalVote,
"workflow_assignment" => MessageType::WorkflowAssignment,
"workflow_stage_result" => MessageType::WorkflowStageResult,
custom => MessageType::Custom(custom.to_string()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentMessage {
pub message_id: String,
pub source_instance: String,
pub target_instance: Option<String>,
pub message_type: MessageType,
pub payload: serde_json::Value,
pub correlation_id: Option<String>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SendMessageRequest {
pub target_instance: Option<String>,
pub message_type: MessageType,
pub payload: serde_json::Value,
pub correlation_id: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SendMessageResponse {
pub message_id: String,
pub status: String,
pub delivered_to: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PendingMessagesResponse {
pub messages: Vec<AgentMessage>,
}
#[derive(Clone)]
pub struct MeshClient {
base_url: String,
client: Client,
}
impl MeshClient {
pub fn new(host: &str, port: u16) -> Self {
Self {
base_url: format!("http://{}:{}", host, port),
client: Client::new(),
}
}
pub fn generate_instance_id() -> String {
let hostname = get_hostname()
.ok()
.and_then(|h| h.into_string().ok())
.unwrap_or_else(|| "unknown".to_string());
let uuid = Uuid::new_v7(Timestamp::now(NoContext));
format!("{}-{}", hostname, uuid)
}
pub async fn register(
&self,
instance_id: String,
hostname: String,
port: u16,
capabilities: Vec<String>,
agent_profiles: Vec<String>,
) -> Result<RegisterResponse> {
let request = RegisterRequest {
instance_id,
hostname,
port,
capabilities,
agent_profiles,
};
let response = self
.client
.post(format!("{}/registry/register", self.base_url))
.json(&request)
.send()
.await?;
if response.status().is_success() {
Ok(response.json().await?)
} else {
anyhow::bail!("Registration failed: {}", response.status())
}
}
pub async fn heartbeat(
&self,
instance_id: &str,
metrics: Option<HashMap<String, serde_json::Value>>,
) -> Result<HeartbeatResponse> {
let request = HeartbeatRequest {
status: "healthy".to_string(),
metrics,
};
let response = self
.client
.post(format!(
"{}/registry/heartbeat/{}",
self.base_url, instance_id
))
.json(&request)
.send()
.await?;
if response.status().is_success() {
Ok(response.json().await?)
} else {
anyhow::bail!("Heartbeat failed: {}", response.status())
}
}
pub async fn list_instances(&self) -> Result<InstancesResponse> {
let response = self
.client
.get(format!("{}/registry/agents", self.base_url))
.send()
.await?;
if response.status().is_success() {
Ok(response.json().await?)
} else {
anyhow::bail!("Failed to list instances: {}", response.status())
}
}
pub async fn deregister(&self, instance_id: &str) -> Result<()> {
let response = self
.client
.delete(format!(
"{}/registry/deregister/{}",
self.base_url, instance_id
))
.send()
.await?;
if response.status().is_success() {
Ok(())
} else {
anyhow::bail!("Deregistration failed: {}", response.status())
}
}
pub async fn send_message(
&self,
source_instance: String,
target_instance: Option<String>,
message_type: MessageType,
payload: serde_json::Value,
correlation_id: Option<String>,
) -> Result<SendMessageResponse> {
let request = SendMessageRequest {
target_instance,
message_type,
payload,
correlation_id,
};
let response = self
.client
.post(format!(
"{}/messages/send/{}",
self.base_url, source_instance
))
.json(&request)
.send()
.await?;
if response.status().is_success() {
Ok(response.json().await?)
} else {
anyhow::bail!("Send message failed: {}", response.status())
}
}
pub async fn get_messages(&self, instance_id: &str) -> Result<PendingMessagesResponse> {
let response = self
.client
.get(format!("{}/messages/{}", self.base_url, instance_id))
.send()
.await?;
if response.status().is_success() {
Ok(response.json().await?)
} else {
anyhow::bail!("Failed to get messages: {}", response.status())
}
}
pub async fn acknowledge_messages(
&self,
instance_id: &str,
message_ids: Vec<String>,
) -> Result<()> {
let response = self
.client
.post(format!("{}/messages/{}/ack", self.base_url, instance_id))
.json(&json!({ "message_ids": message_ids }))
.send()
.await?;
if response.status().is_success() {
Ok(())
} else {
anyhow::bail!("Failed to acknowledge messages: {}", response.status())
}
}
}