use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
#[derive(Debug, Clone, Deserialize)]
#[cfg_attr(feature = "status-server", derive(utoipa::ToSchema))]
pub struct AddOrchestratorRequest {
pub url: String,
#[serde(default)]
pub bearer_token: Option<String>,
#[serde(default)]
pub id: Option<String>,
#[serde(default)]
pub agent_names: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
#[cfg_attr(feature = "status-server", derive(utoipa::ToSchema))]
pub struct ActiveOrchestrator {
pub id: String,
pub url: String,
pub nats_url: String,
pub connected_agents: Vec<String>,
#[cfg_attr(feature = "status-server", schema(value_type = String, format = "date-time"))]
pub connected_at: DateTime<Utc>,
}
#[derive(Clone)]
pub struct OrchestratorRegistry {
entries: Arc<RwLock<Vec<ActiveOrchestrator>>>,
bearer_tokens: Arc<RwLock<std::collections::HashMap<String, String>>>,
add_tx: mpsc::UnboundedSender<AddOrchestratorRequest>,
}
impl OrchestratorRegistry {
pub fn new() -> (Self, mpsc::UnboundedReceiver<AddOrchestratorRequest>) {
let (tx, rx) = mpsc::unbounded_channel();
let registry = Self {
entries: Arc::new(RwLock::new(Vec::new())),
bearer_tokens: Arc::new(RwLock::new(std::collections::HashMap::new())),
add_tx: tx,
};
(registry, rx)
}
pub async fn list(&self) -> Vec<ActiveOrchestrator> {
self.entries.read().await.clone()
}
pub async fn add(&self, mut entry: ActiveOrchestrator) {
let mut entries = self.entries.write().await;
if let Some(existing) = entries.iter_mut().find(|e| e.id == entry.id) {
for agent in &existing.connected_agents {
if !entry.connected_agents.contains(agent) {
entry.connected_agents.push(agent.clone());
}
}
*existing = entry;
} else {
entries.push(entry);
}
}
pub async fn add_agent_to_orchestrator(&self, orch_id: &str, agent_name: &str) {
let mut entries = self.entries.write().await;
if let Some(entry) = entries.iter_mut().find(|e| e.id == orch_id) {
if !entry.connected_agents.iter().any(|a| a == agent_name) {
entry.connected_agents.push(agent_name.to_string());
}
}
}
pub async fn set_bearer_token(&self, orch_id: &str, token: String) {
self.bearer_tokens
.write()
.await
.insert(orch_id.to_string(), token);
}
pub async fn get_bearer_token(&self, orch_id: &str) -> Option<String> {
self.bearer_tokens.read().await.get(orch_id).cloned()
}
pub fn request_add(&self, req: AddOrchestratorRequest) -> Result<(), String> {
self.add_tx
.send(req)
.map_err(|e| format!("Runner channel closed: {}", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn registry_starts_empty() {
let (registry, _rx) = OrchestratorRegistry::new();
assert!(registry.list().await.is_empty());
}
#[tokio::test]
async fn add_and_list_orchestrator() {
let (registry, _rx) = OrchestratorRegistry::new();
registry
.add(ActiveOrchestrator {
id: "test".into(),
url: "http://localhost:8080".into(),
nats_url: "nats://localhost:4222".into(),
connected_agents: vec!["ALPHA".into()],
connected_at: Utc::now(),
})
.await;
let list = registry.list().await;
assert_eq!(list.len(), 1);
assert_eq!(list[0].id, "test");
assert_eq!(list[0].connected_agents, vec!["ALPHA"]);
}
#[tokio::test]
async fn add_agent_to_existing_orchestrator() {
let (registry, _rx) = OrchestratorRegistry::new();
registry
.add(ActiveOrchestrator {
id: "orch1".into(),
url: "http://localhost:8080".into(),
nats_url: "nats://localhost:4222".into(),
connected_agents: vec!["ALPHA".into()],
connected_at: Utc::now(),
})
.await;
registry.add_agent_to_orchestrator("orch1", "BETA").await;
let list = registry.list().await;
assert_eq!(list[0].connected_agents, vec!["ALPHA", "BETA"]);
}
#[tokio::test]
async fn add_agent_deduplicates() {
let (registry, _rx) = OrchestratorRegistry::new();
registry
.add(ActiveOrchestrator {
id: "orch1".into(),
url: "http://localhost:8080".into(),
nats_url: "nats://localhost:4222".into(),
connected_agents: vec!["ALPHA".into()],
connected_at: Utc::now(),
})
.await;
registry.add_agent_to_orchestrator("orch1", "ALPHA").await;
let list = registry.list().await;
assert_eq!(list[0].connected_agents, vec!["ALPHA"]);
}
#[tokio::test]
async fn request_add_sends_through_channel() {
let (registry, mut rx) = OrchestratorRegistry::new();
registry
.request_add(AddOrchestratorRequest {
url: "http://orch-2:8080".into(),
bearer_token: Some("token123".into()),
id: Some("secondary".into()),
agent_names: vec!["ALPHA".into()],
})
.unwrap();
let req = rx.recv().await.unwrap();
assert_eq!(req.url, "http://orch-2:8080");
assert_eq!(req.id, Some("secondary".into()));
assert_eq!(req.agent_names, vec!["ALPHA"]);
}
#[tokio::test]
async fn upsert_preserves_connected_agents() {
let (registry, _rx) = OrchestratorRegistry::new();
registry
.add(ActiveOrchestrator {
id: "orch1".into(),
url: "http://localhost:8080".into(),
nats_url: "nats://localhost:4222".into(),
connected_agents: vec![],
connected_at: Utc::now(),
})
.await;
registry.add_agent_to_orchestrator("orch1", "ALPHA").await;
registry.add_agent_to_orchestrator("orch1", "BETA").await;
registry
.add(ActiveOrchestrator {
id: "orch1".into(),
url: "http://new-host:8080".into(),
nats_url: "nats://new-host:4222".into(),
connected_agents: vec![],
connected_at: Utc::now(),
})
.await;
let list = registry.list().await;
assert_eq!(list.len(), 1);
assert_eq!(list[0].url, "http://new-host:8080");
assert!(list[0].connected_agents.contains(&"ALPHA".to_string()));
assert!(list[0].connected_agents.contains(&"BETA".to_string()));
}
#[tokio::test]
async fn request_add_fails_when_receiver_dropped() {
let (registry, rx) = OrchestratorRegistry::new();
drop(rx);
let result = registry.request_add(AddOrchestratorRequest {
url: "http://orch:8080".into(),
bearer_token: None,
id: None,
agent_names: vec![],
});
assert!(result.is_err());
}
}