spec_ai_api/api/
mesh.rs

1use crate::persistence::Persistence;
2use anyhow::Result;
3/// Mesh registry handlers and models
4use axum::{
5    extract::{Json, Path, State},
6    http::StatusCode,
7    response::IntoResponse,
8};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Agent instance information in the mesh
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct MeshInstance {
18    pub instance_id: String,
19    pub hostname: String,
20    pub port: u16,
21    pub capabilities: Vec<String>,
22    pub is_leader: bool,
23    pub last_heartbeat: DateTime<Utc>,
24    pub created_at: DateTime<Utc>,
25    pub agent_profiles: Vec<String>,
26}
27
28/// Request to register a new instance
29#[derive(Debug, Serialize, Deserialize)]
30pub struct RegisterRequest {
31    pub instance_id: String,
32    pub hostname: String,
33    pub port: u16,
34    pub capabilities: Vec<String>,
35    pub agent_profiles: Vec<String>,
36}
37
38/// Response from registration
39#[derive(Debug, Serialize, Deserialize)]
40pub struct RegisterResponse {
41    pub success: bool,
42    pub instance_id: String,
43    pub is_leader: bool,
44    pub leader_id: Option<String>,
45    pub peers: Vec<MeshInstance>,
46}
47
48/// List of registered instances
49#[derive(Debug, Serialize, Deserialize)]
50pub struct InstancesResponse {
51    pub instances: Vec<MeshInstance>,
52    pub leader_id: Option<String>,
53}
54
55/// Heartbeat request
56#[derive(Debug, Serialize, Deserialize)]
57pub struct HeartbeatRequest {
58    pub status: String,
59    pub metrics: Option<HashMap<String, serde_json::Value>>,
60}
61
62/// Heartbeat response
63#[derive(Debug, Serialize, Deserialize)]
64pub struct HeartbeatResponse {
65    pub acknowledged: bool,
66    pub leader_id: Option<String>,
67    pub should_sync: bool,
68}
69
70/// Message types for inter-agent communication
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72pub enum MessageType {
73    Query,          // Request information from another agent
74    Response,       // Response to a query
75    Notification,   // One-way notification
76    TaskDelegation, // Delegate a task to another agent
77    TaskResult,     // Result of a delegated task
78    GraphSync,      // Knowledge graph synchronization
79    Custom(String), // Custom message type
80}
81
82impl MessageType {
83    pub fn as_str(&self) -> String {
84        match self {
85            MessageType::Query => "query".to_string(),
86            MessageType::Response => "response".to_string(),
87            MessageType::Notification => "notification".to_string(),
88            MessageType::TaskDelegation => "task_delegation".to_string(),
89            MessageType::TaskResult => "task_result".to_string(),
90            MessageType::GraphSync => "graph_sync".to_string(),
91            MessageType::Custom(s) => s.clone(),
92        }
93    }
94
95    pub fn from_str(s: &str) -> Self {
96        match s.to_lowercase().as_str() {
97            "query" => MessageType::Query,
98            "response" => MessageType::Response,
99            "notification" => MessageType::Notification,
100            "task_delegation" => MessageType::TaskDelegation,
101            "task_result" => MessageType::TaskResult,
102            "graph_sync" => MessageType::GraphSync,
103            custom => MessageType::Custom(custom.to_string()),
104        }
105    }
106}
107
108/// Inter-agent message
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct AgentMessage {
111    pub message_id: String,
112    pub source_instance: String,
113    pub target_instance: Option<String>, // None for broadcast
114    pub message_type: MessageType,
115    pub payload: serde_json::Value,
116    pub correlation_id: Option<String>, // For request/response correlation
117    pub created_at: DateTime<Utc>,
118}
119
120/// Message send request
121#[derive(Debug, Serialize, Deserialize)]
122pub struct SendMessageRequest {
123    pub target_instance: Option<String>, // None for broadcast
124    pub message_type: MessageType,
125    pub payload: serde_json::Value,
126    pub correlation_id: Option<String>,
127}
128
129/// Message send response
130#[derive(Debug, Serialize, Deserialize)]
131pub struct SendMessageResponse {
132    pub message_id: String,
133    pub status: String,
134    pub delivered_to: Vec<String>,
135}
136
137/// Pending messages response
138#[derive(Debug, Serialize, Deserialize)]
139pub struct PendingMessagesResponse {
140    pub messages: Vec<AgentMessage>,
141}
142
143/// Mesh registry state
144#[derive(Clone)]
145pub struct MeshRegistry {
146    instances: Arc<RwLock<HashMap<String, MeshInstance>>>,
147    leader_id: Arc<RwLock<Option<String>>>,
148    message_queue: Arc<RwLock<Vec<AgentMessage>>>,
149    persistence: Option<Persistence>,
150}
151
152impl MeshRegistry {
153    pub fn new() -> Self {
154        Self {
155            instances: Arc::new(RwLock::new(HashMap::new())),
156            leader_id: Arc::new(RwLock::new(None)),
157            message_queue: Arc::new(RwLock::new(Vec::new())),
158            persistence: None,
159        }
160    }
161
162    pub fn with_persistence(persistence: Persistence) -> Self {
163        Self {
164            instances: Arc::new(RwLock::new(HashMap::new())),
165            leader_id: Arc::new(RwLock::new(None)),
166            message_queue: Arc::new(RwLock::new(Vec::new())),
167            persistence: Some(persistence),
168        }
169    }
170
171    /// Register a new instance
172    pub async fn register(&self, instance: MeshInstance) -> RegisterResponse {
173        let mut instances = self.instances.write().await;
174        let mut leader = self.leader_id.write().await;
175
176        // First instance becomes the leader
177        let is_leader = instances.is_empty();
178        let mut new_instance = instance.clone();
179        new_instance.is_leader = is_leader;
180
181        if is_leader {
182            *leader = Some(instance.instance_id.clone());
183        }
184
185        instances.insert(instance.instance_id.clone(), new_instance);
186
187        RegisterResponse {
188            success: true,
189            instance_id: instance.instance_id.clone(),
190            is_leader,
191            leader_id: leader.clone(),
192            peers: instances.values().cloned().collect(),
193        }
194    }
195
196    /// Update heartbeat timestamp
197    pub async fn heartbeat(&self, instance_id: &str) -> HeartbeatResponse {
198        let mut instances = self.instances.write().await;
199        let leader = self.leader_id.read().await;
200
201        if let Some(instance) = instances.get_mut(instance_id) {
202            instance.last_heartbeat = Utc::now();
203            HeartbeatResponse {
204                acknowledged: true,
205                leader_id: leader.clone(),
206                should_sync: false,
207            }
208        } else {
209            HeartbeatResponse {
210                acknowledged: false,
211                leader_id: leader.clone(),
212                should_sync: false,
213            }
214        }
215    }
216
217    /// Remove an instance
218    pub async fn deregister(&self, instance_id: &str) -> bool {
219        let mut instances = self.instances.write().await;
220        let mut leader = self.leader_id.write().await;
221
222        if let Some(instance) = instances.remove(instance_id) {
223            // If leader is leaving, elect a new one
224            if instance.is_leader && !instances.is_empty() {
225                // Simple election: first remaining instance becomes leader
226                if let Some((new_leader_id, new_leader)) = instances.iter_mut().next() {
227                    new_leader.is_leader = true;
228                    *leader = Some(new_leader_id.clone());
229                }
230            } else if instances.is_empty() {
231                *leader = None;
232            }
233            true
234        } else {
235            false
236        }
237    }
238
239    /// Get all instances
240    pub async fn list(&self) -> Vec<MeshInstance> {
241        let instances = self.instances.read().await;
242        instances.values().cloned().collect()
243    }
244
245    /// Check for stale instances and remove them
246    pub async fn cleanup_stale(&self, timeout_secs: u64) {
247        let now = Utc::now();
248        let mut instances = self.instances.write().await;
249        let mut leader = self.leader_id.write().await;
250
251        let stale_ids: Vec<String> = instances
252            .iter()
253            .filter_map(|(id, instance)| {
254                let elapsed = now.timestamp() - instance.last_heartbeat.timestamp();
255                if elapsed > timeout_secs as i64 {
256                    Some(id.clone())
257                } else {
258                    None
259                }
260            })
261            .collect();
262
263        for id in stale_ids {
264            if let Some(instance) = instances.remove(&id) {
265                // Handle leader failover if needed
266                if instance.is_leader && !instances.is_empty() {
267                    if let Some((new_leader_id, new_leader)) = instances.iter_mut().next() {
268                        new_leader.is_leader = true;
269                        *leader = Some(new_leader_id.clone());
270                    }
271                }
272            }
273        }
274
275        if instances.is_empty() {
276            *leader = None;
277        }
278    }
279
280    /// Get the current leader ID
281    pub async fn get_leader(&self) -> Option<String> {
282        let leader = self.leader_id.read().await;
283        leader.clone()
284    }
285
286    /// Send a message to an instance or broadcast
287    pub async fn send_message(
288        &self,
289        source_instance: String,
290        target_instance: Option<String>,
291        message_type: MessageType,
292        payload: serde_json::Value,
293        correlation_id: Option<String>,
294    ) -> Result<SendMessageResponse> {
295        // Generate time-ordered UUID v7 for better database performance and distributed safety
296        let message_id = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)).to_string();
297
298        let message = AgentMessage {
299            message_id: message_id.clone(),
300            source_instance,
301            target_instance: target_instance.clone(),
302            message_type,
303            payload,
304            correlation_id,
305            created_at: Utc::now(),
306        };
307
308        // Persist to database if available
309        if let Some(ref persistence) = self.persistence {
310            let target_str = target_instance.as_deref();
311            if let Err(e) = persistence.mesh_message_store(
312                &message_id,
313                &message.source_instance,
314                target_str,
315                &message.message_type.as_str(),
316                &message.payload,
317                "pending",
318            ) {
319                tracing::warn!("Failed to persist mesh message: {}", e);
320            }
321        }
322
323        // Add to message queue
324        let mut queue = self.message_queue.write().await;
325        queue.push(message.clone());
326
327        // GraphSync messages are handled when retrieved from the queue
328        // to avoid recursion issues
329
330        // Determine who received it
331        let delivered_to = if let Some(ref target) = target_instance {
332            let instances = self.instances.read().await;
333            if instances.contains_key(target) {
334                vec![target.clone()]
335            } else {
336                return Err(anyhow::anyhow!("Target instance '{}' not found", target));
337            }
338        } else {
339            // Broadcast - delivered to all instances
340            let instances = self.instances.read().await;
341            instances.keys().cloned().collect()
342        };
343
344        Ok(SendMessageResponse {
345            message_id,
346            status: "queued".to_string(),
347            delivered_to,
348        })
349    }
350
351    /// Get pending messages for an instance
352    pub async fn get_pending_messages(&self, instance_id: &str) -> Vec<AgentMessage> {
353        let queue = self.message_queue.read().await;
354        queue
355            .iter()
356            .filter(|msg| {
357                // Return messages targeted at this instance or broadcasts (None)
358                msg.target_instance.as_deref() == Some(instance_id) || msg.target_instance.is_none()
359            })
360            .cloned()
361            .collect()
362    }
363
364    /// Acknowledge/remove messages after delivery
365    pub async fn acknowledge_messages(&self, message_ids: Vec<String>) {
366        let mut queue = self.message_queue.write().await;
367        queue.retain(|msg| !message_ids.contains(&msg.message_id));
368    }
369}
370
371/// Client-side mesh operations
372#[derive(Clone)]
373pub struct MeshClient {
374    base_url: String,
375    client: reqwest::Client,
376}
377
378impl MeshClient {
379    pub fn new(host: &str, port: u16) -> Self {
380        Self {
381            base_url: format!("http://{}:{}", host, port),
382            client: reqwest::Client::new(),
383        }
384    }
385
386    /// Generate a unique instance ID
387    pub fn generate_instance_id() -> String {
388        let hostname = hostname::get()
389            .ok()
390            .and_then(|h| h.into_string().ok())
391            .unwrap_or_else(|| "unknown".to_string());
392        // Use UUID v7 for time-ordered, globally unique IDs with better collision resistance
393        let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
394        format!("{}-{}", hostname, uuid)
395    }
396
397    /// Register this instance with a mesh registry
398    pub async fn register(
399        &self,
400        instance_id: String,
401        hostname: String,
402        port: u16,
403        capabilities: Vec<String>,
404        agent_profiles: Vec<String>,
405    ) -> Result<RegisterResponse> {
406        let request = RegisterRequest {
407            instance_id,
408            hostname,
409            port,
410            capabilities,
411            agent_profiles,
412        };
413
414        let response = self
415            .client
416            .post(format!("{}/registry/register", self.base_url))
417            .json(&request)
418            .send()
419            .await?;
420
421        if response.status().is_success() {
422            Ok(response.json().await?)
423        } else {
424            anyhow::bail!("Registration failed: {}", response.status())
425        }
426    }
427
428    /// Send heartbeat to registry
429    pub async fn heartbeat(
430        &self,
431        instance_id: &str,
432        metrics: Option<HashMap<String, serde_json::Value>>,
433    ) -> Result<HeartbeatResponse> {
434        let request = HeartbeatRequest {
435            status: "healthy".to_string(),
436            metrics,
437        };
438
439        let response = self
440            .client
441            .post(format!(
442                "{}/registry/heartbeat/{}",
443                self.base_url, instance_id
444            ))
445            .json(&request)
446            .send()
447            .await?;
448
449        if response.status().is_success() {
450            Ok(response.json().await?)
451        } else {
452            anyhow::bail!("Heartbeat failed: {}", response.status())
453        }
454    }
455
456    /// List all instances in the mesh
457    pub async fn list_instances(&self) -> Result<InstancesResponse> {
458        let response = self
459            .client
460            .get(format!("{}/registry/agents", self.base_url))
461            .send()
462            .await?;
463
464        if response.status().is_success() {
465            Ok(response.json().await?)
466        } else {
467            anyhow::bail!("Failed to list instances: {}", response.status())
468        }
469    }
470
471    /// Deregister from the mesh
472    pub async fn deregister(&self, instance_id: &str) -> Result<()> {
473        let response = self
474            .client
475            .delete(format!(
476                "{}/registry/deregister/{}",
477                self.base_url, instance_id
478            ))
479            .send()
480            .await?;
481
482        if response.status().is_success() {
483            Ok(())
484        } else {
485            anyhow::bail!("Deregistration failed: {}", response.status())
486        }
487    }
488
489    /// Send a message to another instance
490    pub async fn send_message(
491        &self,
492        source_instance: String,
493        target_instance: Option<String>,
494        message_type: MessageType,
495        payload: serde_json::Value,
496        correlation_id: Option<String>,
497    ) -> Result<SendMessageResponse> {
498        let request = SendMessageRequest {
499            target_instance,
500            message_type,
501            payload,
502            correlation_id,
503        };
504
505        let response = self
506            .client
507            .post(format!(
508                "{}/messages/send/{}",
509                self.base_url, source_instance
510            ))
511            .json(&request)
512            .send()
513            .await?;
514
515        if response.status().is_success() {
516            Ok(response.json().await?)
517        } else {
518            anyhow::bail!("Send message failed: {}", response.status())
519        }
520    }
521
522    /// Get pending messages for an instance
523    pub async fn get_messages(&self, instance_id: &str) -> Result<PendingMessagesResponse> {
524        let response = self
525            .client
526            .get(format!("{}/messages/{}", self.base_url, instance_id))
527            .send()
528            .await?;
529
530        if response.status().is_success() {
531            Ok(response.json().await?)
532        } else {
533            anyhow::bail!("Get messages failed: {}", response.status())
534        }
535    }
536
537    /// Acknowledge received messages
538    pub async fn acknowledge_messages(
539        &self,
540        instance_id: &str,
541        message_ids: Vec<String>,
542    ) -> Result<()> {
543        let request = AcknowledgeMessagesRequest { message_ids };
544
545        let response = self
546            .client
547            .post(format!("{}/messages/ack/{}", self.base_url, instance_id))
548            .json(&request)
549            .send()
550            .await?;
551
552        if response.status().is_success() {
553            Ok(())
554        } else {
555            anyhow::bail!("Acknowledge failed: {}", response.status())
556        }
557    }
558}
559
560/// Extension trait to add mesh registry to app state
561pub trait MeshState {
562    fn mesh_registry(&self) -> &MeshRegistry;
563}
564
565/// Handler: Register a new instance
566pub async fn register_instance<S: MeshState>(
567    State(state): State<S>,
568    Json(request): Json<RegisterRequest>,
569) -> impl IntoResponse {
570    let instance = MeshInstance {
571        instance_id: request.instance_id,
572        hostname: request.hostname,
573        port: request.port,
574        capabilities: request.capabilities,
575        is_leader: false, // Will be set by registry
576        last_heartbeat: Utc::now(),
577        created_at: Utc::now(),
578        agent_profiles: request.agent_profiles,
579    };
580
581    let response = state.mesh_registry().register(instance).await;
582    (StatusCode::OK, Json(response))
583}
584
585/// Handler: List all instances
586pub async fn list_instances<S: MeshState>(State(state): State<S>) -> impl IntoResponse {
587    let instances = state.mesh_registry().list().await;
588    let leader_id = instances
589        .iter()
590        .find(|i| i.is_leader)
591        .map(|i| i.instance_id.clone());
592
593    Json(InstancesResponse {
594        instances,
595        leader_id,
596    })
597}
598
599/// Handler: Heartbeat from an instance
600pub async fn heartbeat<S: MeshState>(
601    State(state): State<S>,
602    Path(instance_id): Path<String>,
603    Json(_request): Json<HeartbeatRequest>,
604) -> impl IntoResponse {
605    let response = state.mesh_registry().heartbeat(&instance_id).await;
606
607    if response.acknowledged {
608        (StatusCode::OK, Json(response))
609    } else {
610        (StatusCode::NOT_FOUND, Json(response))
611    }
612}
613
614/// Handler: Deregister an instance
615pub async fn deregister_instance<S: MeshState>(
616    State(state): State<S>,
617    Path(instance_id): Path<String>,
618) -> impl IntoResponse {
619    let removed = state.mesh_registry().deregister(&instance_id).await;
620
621    if removed {
622        StatusCode::NO_CONTENT
623    } else {
624        StatusCode::NOT_FOUND
625    }
626}
627
628/// Handler: Send a message to another instance
629pub async fn send_message<S: MeshState>(
630    State(state): State<S>,
631    Path(source_instance): Path<String>,
632    Json(request): Json<SendMessageRequest>,
633) -> impl IntoResponse {
634    match state
635        .mesh_registry()
636        .send_message(
637            source_instance,
638            request.target_instance,
639            request.message_type,
640            request.payload,
641            request.correlation_id,
642        )
643        .await
644    {
645        Ok(response) => (StatusCode::OK, Json(response)).into_response(),
646        Err(e) => (
647            StatusCode::BAD_REQUEST,
648            Json(serde_json::json!({
649                "error": e.to_string()
650            })),
651        )
652            .into_response(),
653    }
654}
655
656/// Handler: Get pending messages for an instance
657pub async fn get_messages<S: MeshState>(
658    State(state): State<S>,
659    Path(instance_id): Path<String>,
660) -> impl IntoResponse {
661    let messages = state
662        .mesh_registry()
663        .get_pending_messages(&instance_id)
664        .await;
665
666    Json(PendingMessagesResponse { messages })
667}
668
669/// Acknowledge messages request
670#[derive(Debug, Serialize, Deserialize)]
671pub struct AcknowledgeMessagesRequest {
672    pub message_ids: Vec<String>,
673}
674
675/// Handler: Acknowledge received messages
676pub async fn acknowledge_messages<S: MeshState>(
677    State(state): State<S>,
678    Path(_instance_id): Path<String>,
679    Json(request): Json<AcknowledgeMessagesRequest>,
680) -> impl IntoResponse {
681    state
682        .mesh_registry()
683        .acknowledge_messages(request.message_ids)
684        .await;
685
686    StatusCode::NO_CONTENT
687}