use crate::cli::remote::{AgentInfo, RemoteError, RemoteOrchestrator};
use crate::cli::workspace::{OrchestratorMode, QuorumConfig};
use crate::nats_utils::NatsAuth;
use crate::serve::{ServeOptions, serve_fleet};
use anyhow::{Context, Result};
use std::path::{Path, PathBuf};
const SELFCHECK_DELAY_SECS: u64 = 20;
#[derive(Debug, PartialEq, Eq)]
enum RegVerdict {
Ok,
Dropped,
Unattributed,
LocalFallback,
NoOperatorTags,
}
fn evaluate_registration(agent_id: &str, agents: &[AgentInfo]) -> RegVerdict {
let Some(a) = agents.iter().find(|a| a.agent_id == agent_id) else {
return RegVerdict::Dropped;
};
match a.operator.as_deref() {
None | Some("") => RegVerdict::Unattributed,
Some("local") => RegVerdict::LocalFallback,
Some(_) if a.operator_tags.is_empty() => RegVerdict::NoOperatorTags,
Some(_) => RegVerdict::Ok,
}
}
async fn run_registration_selfcheck(orch: &RemoteOrchestrator, agent_ids: &[String]) {
let agents = match orch.agents().await {
Ok(a) => a,
Err(e) => {
tracing::warn!(error = %e, "registration self-check skipped: GET /agents failed");
return;
}
};
for id in agent_ids {
match evaluate_registration(id, &agents) {
RegVerdict::Ok => {
tracing::info!(agent_id = %id, "registered and attributed at orchestrator")
}
RegVerdict::Dropped => tracing::error!(
agent_id = %id,
"NOT visible at orchestrator after {SELFCHECK_DELAY_SECS}s — heartbeat dropped \
(no operator link) or not registered. This agent will receive no jobs. \
Re-redeem the agent code with operator_name set."
),
RegVerdict::Unattributed => tracing::error!(
agent_id = %id,
"registered but has NO operator at orchestrator — will fail grant-based \
eligibility and receive no jobs"
),
RegVerdict::LocalFallback => tracing::warn!(
agent_id = %id,
"operator is `local` at orchestrator — the agent code was minted without \
operator_name, so it has no grants/tags and will fail eligibility. \
Re-mint + redeem the agent code with operator_name set."
),
RegVerdict::NoOperatorTags => tracing::warn!(
agent_id = %id,
"operator set but has no tags at orchestrator — will fail grant-based \
room eligibility"
),
}
}
}
fn try_build_orchestrator_client(
workspace_path: &Path,
room_flag: Option<&str>,
) -> Option<RemoteOrchestrator> {
let workspace = QuorumConfig::load_workspace(workspace_path).ok()?;
let (_room_name, room) = workspace.resolve_room(room_flag).ok()?;
let orch_name = room.orchestrator.as_deref()?;
let orch = workspace.orchestrators.get(orch_name)?;
match orch.mode.as_ref() {
Some(OrchestratorMode::Remote) | None => {
let address = orch.address.as_deref()?;
let token_raw = orch.token.as_deref()?;
let token = crate::config::resolve_env_token("token", token_raw);
RemoteOrchestrator::new(address, &token).ok()
}
Some(OrchestratorMode::Embedded) => None,
}
}
pub(crate) fn resolve_remote_orchestrator(
workspace_path: &Path,
room_flag: Option<&str>,
) -> Option<(String, String)> {
let workspace = QuorumConfig::load_workspace(workspace_path).ok()?;
let (_room_name, room) = workspace.resolve_room(room_flag).ok()?;
let orch_name = room.orchestrator.as_deref()?;
let orch = workspace.orchestrators.get(orch_name)?;
match orch.mode.as_ref() {
Some(OrchestratorMode::Remote) | None => {
let address = orch.address.as_deref()?.to_string();
let token = crate::config::resolve_env_token("token", orch.token.as_deref()?);
Some((address, token))
}
Some(OrchestratorMode::Embedded) => None,
}
}
async fn register_fleet_agents(
orch_url: &str,
bearer: &str,
fleet: &crate::config::AgentFleetConfig,
agent_filter: Option<&[String]>,
) -> std::collections::HashMap<String, crate::serve::AgentConn> {
let mut map = std::collections::HashMap::new();
let names: Vec<String> = fleet
.agents
.iter()
.filter(|a| agent_filter.is_none_or(|f| f.iter().any(|n| n == &a.name)))
.map(|a| a.name.clone())
.collect();
if bearer.trim().is_empty() {
tracing::error!(
"no operator token resolved for agent registration — agents will be unattributed \
and DROPPED by the orchestrator. Set the orchestrator `token` in your workspace \
(the operator token from `quorum redeem` / `init --invite`)."
);
return map;
}
for name in names {
match crate::nats_utils::register_with_orchestrator_with_retry(orch_url, &name, bearer, 5)
.await
{
Ok(reg) => {
tracing::info!(agent = %name, "registered + attributed under operator");
map.insert(
name,
crate::serve::AgentConn {
nats_url: reg.nats_url,
nats_auth: Some(NatsAuth {
inline_creds: Some(reg.creds),
..Default::default()
}),
},
);
}
Err(e) => {
tracing::error!(
agent = %name,
error = %e,
"agent registration FAILED — this agent will be unattributed and DROPPED. \
Check the operator token carries the `manage_agents` role."
);
}
}
}
map
}
const DEFAULT_FLEET_PATHS: &[&str] = &[
"quorum.yml",
"quorum.yaml",
"agent.yml",
"config/agent.yml",
"config/default.yml",
];
pub(crate) fn load_fleet_unified(path: &Path) -> Result<crate::config::AgentFleetConfig> {
match QuorumConfig::load(path) {
Ok(q) => Ok(q.to_fleet()),
Err(_) => crate::config::load_config(path),
}
}
fn default_creds_path() -> Option<PathBuf> {
let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"))?;
let mut p = PathBuf::from(home);
p.push(".nsed");
p.push("agent.creds");
Some(p)
}
fn resolve_config_path(config: Option<&Path>) -> Result<PathBuf> {
if let Some(p) = config {
return Ok(p.to_path_buf());
}
for candidate in DEFAULT_FLEET_PATHS {
let p = PathBuf::from(candidate);
if p.exists() {
return Ok(p);
}
}
anyhow::bail!(
"no fleet config found. Pass --config PATH or create one of: {}",
DEFAULT_FLEET_PATHS.join(", ")
)
}
fn resolve_nats_auth(creds_arg: Option<&Path>) -> Option<NatsAuth> {
if let Some(p) = creds_arg {
return Some(NatsAuth {
creds_file: Some(p.display().to_string()),
..Default::default()
});
}
if let Some(default) = default_creds_path()
&& default.exists()
{
return Some(NatsAuth {
creds_file: Some(default.display().to_string()),
..Default::default()
});
}
None
}
async fn resolve_nats_url(
nats_url_flag: Option<&str>,
fleet_nats_url: Option<&str>,
workspace_path: &Path,
room_flag: Option<&str>,
) -> Result<String> {
if let Some(u) = nats_url_flag {
return Ok(u.to_string());
}
if let Some(u) = fleet_nats_url {
return Ok(u.to_string());
}
let workspace = QuorumConfig::load_workspace(workspace_path).with_context(|| {
format!(
"no --nats-url passed, no telemetry NATS URL in the fleet config, and workspace \
config not loadable at {}",
workspace_path.display()
)
})?;
let (room_name, room) = workspace
.resolve_room(room_flag)
.with_context(|| "could not resolve a room for NATS-URL lookup")?;
let orch_name = room.orchestrator.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"room `{room_name}` has no orchestrator wired — set `orchestrator: <name>` \
or pass --nats-url"
)
})?;
let orch = workspace.orchestrators.get(orch_name).ok_or_else(|| {
anyhow::anyhow!(
"room `{room_name}` references orchestrator `{orch_name}` which is not in \
workspace.orchestrators"
)
})?;
match orch.mode.as_ref() {
Some(OrchestratorMode::Embedded) => orch.nats_url.clone().ok_or_else(|| {
anyhow::anyhow!(
"orchestrator `{orch_name}` is embedded but has no `nats_url` set — fix the \
workspace config or pass --nats-url"
)
}),
Some(OrchestratorMode::Remote) | None => {
let address = orch.address.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"orchestrator `{orch_name}` is remote but has no `address` — fix the \
workspace config or pass --nats-url"
)
})?;
let token_raw = orch.token.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"orchestrator `{orch_name}` is remote but has no `token` — fix the \
workspace config or pass --nats-url"
)
})?;
let token = crate::config::resolve_env_token("token", token_raw);
let client = RemoteOrchestrator::new(address, &token).with_context(|| {
format!("building HTTP client for orchestrator `{orch_name}` at {address}")
})?;
client.runtime_nats().await.map_err(|e| match e {
RemoteError::ApiError { status, body } => anyhow::anyhow!(
"orchestrator `{orch_name}` at {address} returned {status} on \
/api/runtime/nats: {body}. Pass --nats-url to bypass."
),
other => anyhow::anyhow!(
"querying orchestrator `{orch_name}` at {address} for NATS URL: {other}"
),
})
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run(
config: Option<&Path>,
workspace_path: &Path,
room: Option<&str>,
nats_url: Option<&str>,
nats_creds: Option<&Path>,
agent_filter: Option<&[String]>,
stream_name: Option<&str>,
api_prefix: Option<&str>,
dashboard_port: Option<u16>,
dashboard_bind: Option<&str>,
) -> Result<()> {
crate::serve::install_default_tracing();
if let Some(bind) = dashboard_bind {
unsafe {
std::env::set_var("QUORUM_DASHBOARD_BIND", bind);
}
}
let config_path = resolve_config_path(config)?;
let is_unified = QuorumConfig::load(&config_path).is_ok();
let fleet = load_fleet_unified(&config_path)
.with_context(|| format!("failed to load fleet config at {}", config_path.display()))?;
let effective_workspace: &Path = if is_unified {
config_path.as_path()
} else {
workspace_path
};
let fleet_nats_url = fleet
.telemetry
.endpoints
.iter()
.find_map(|e| e.nats_url.as_deref());
let resolved_nats_url =
resolve_nats_url(nats_url, fleet_nats_url, effective_workspace, room).await?;
tracing::info!(nats_url = %resolved_nats_url, "resolved NATS URL");
let agent_auth = match resolve_remote_orchestrator(effective_workspace, room) {
Some((orch_url, bearer)) => {
register_fleet_agents(&orch_url, &bearer, &fleet, agent_filter).await
}
None => std::collections::HashMap::new(),
};
let cancel = tokio_util::sync::CancellationToken::new();
let opts = ServeOptions {
nats_url: resolved_nats_url,
agent_auth,
nats_auth: resolve_nats_auth(nats_creds),
agent_filter: agent_filter.map(|v| v.to_vec()),
stream_name: stream_name
.map(|s| s.to_string())
.unwrap_or_else(|| "sphera_jobs".to_string()),
api_prefix: api_prefix
.map(|s| s.to_string())
.unwrap_or_else(|| "sphera".to_string()),
cancel: Some(cancel.clone()),
dashboard_port,
registry: None,
};
if let Some(orch) = try_build_orchestrator_client(effective_workspace, room) {
let selected: Vec<String> = fleet
.agents
.iter()
.filter(|a| agent_filter.is_none_or(|f| f.iter().any(|n| n == &a.name)))
.map(|a| a.name.clone())
.collect();
let cancel_sc = cancel.clone();
tokio::spawn(async move {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(SELFCHECK_DELAY_SECS)) => {
run_registration_selfcheck(&orch, &selected).await;
}
_ = cancel_sc.cancelled() => {}
}
});
}
tokio::select! {
result = serve_fleet(&fleet, opts) => result,
_ = shutdown_signal() => {
tracing::info!("shutdown signal received; cancelling workers");
cancel.cancel();
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
Ok(())
}
}
}
async fn shutdown_signal() {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
tokio::select! {
_ = sigterm.recv() => {}
_ = sigint.recv() => {}
}
}
#[cfg(not(unix))]
{
let _ = tokio::signal::ctrl_c().await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn resolve_config_path_prefers_explicit_arg() {
let tmp = TempDir::new().unwrap();
let p = tmp.path().join("explicit.yml");
std::fs::write(&p, "providers: {}\nagents: []\n").unwrap();
let resolved = resolve_config_path(Some(&p)).unwrap();
assert_eq!(resolved, p);
}
#[test]
fn resolve_config_path_bails_when_nothing_exists() {
let tmp = TempDir::new().unwrap();
let cwd = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let err = resolve_config_path(None).unwrap_err();
std::env::set_current_dir(cwd).unwrap();
let msg = err.to_string();
assert!(
msg.contains("--config") && msg.contains("agent.yml"),
"error must point at --config + default paths; got: {msg}"
);
}
#[test]
fn resolve_nats_auth_uses_explicit_creds() {
let tmp = TempDir::new().unwrap();
let p = tmp.path().join("custom.creds");
std::fs::write(&p, "stub").unwrap();
let auth = resolve_nats_auth(Some(&p)).unwrap();
assert_eq!(
auth.creds_file.as_deref(),
Some(p.display().to_string().as_str())
);
}
#[test]
#[serial_test::serial(home)]
fn resolve_nats_auth_returns_none_when_no_creds_anywhere() {
let tmp = TempDir::new().unwrap();
let prev_home = std::env::var_os("HOME");
unsafe {
std::env::set_var("HOME", tmp.path());
}
let auth = resolve_nats_auth(None);
unsafe {
match prev_home {
Some(v) => std::env::set_var("HOME", v),
None => std::env::remove_var("HOME"),
}
}
assert!(auth.is_none());
}
#[tokio::test]
async fn resolve_nats_url_explicit_flag_wins() {
let tmp = TempDir::new().unwrap();
let missing_ws = tmp.path().join("nsed.yaml");
let resolved = resolve_nats_url(Some("nats://explicit:4222"), None, &missing_ws, None)
.await
.expect("explicit URL must short-circuit workspace lookup");
assert_eq!(resolved, "nats://explicit:4222");
}
#[tokio::test]
async fn resolve_nats_url_fleet_telemetry_used_before_workspace() {
let tmp = TempDir::new().unwrap();
let missing_ws = tmp.path().join("nsed.yaml");
let resolved =
resolve_nats_url(None, Some("nats://from-telemetry:4222"), &missing_ws, None)
.await
.expect("fleet telemetry URL must be used without a loadable workspace");
assert_eq!(resolved, "nats://from-telemetry:4222");
}
#[tokio::test]
async fn resolve_nats_url_flag_beats_fleet_telemetry() {
let tmp = TempDir::new().unwrap();
let missing_ws = tmp.path().join("nsed.yaml");
let resolved = resolve_nats_url(
Some("nats://explicit:4222"),
Some("nats://from-telemetry:4222"),
&missing_ws,
None,
)
.await
.unwrap();
assert_eq!(resolved, "nats://explicit:4222");
}
#[tokio::test]
async fn resolve_nats_url_fails_loud_when_workspace_missing() {
let tmp = TempDir::new().unwrap();
let missing_ws = tmp.path().join("nsed.yaml");
let err = resolve_nats_url(None, None, &missing_ws, None)
.await
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("nsed.yaml") && msg.contains("--nats-url"),
"error must name workspace + suggest --nats-url; got: {msg}"
);
assert!(
!msg.contains("localhost"),
"error must NOT suggest localhost fallback; got: {msg}"
);
}
#[tokio::test]
async fn resolve_nats_url_embedded_uses_inline_field() {
let tmp = TempDir::new().unwrap();
let ws_path = tmp.path().join("nsed.yaml");
std::fs::write(
&ws_path,
r#"
orchestrators:
local:
mode: embedded
nats_url: "nats://embedded-host:4222"
policies:
default:
agents: [a, b]
rooms:
main:
policy: default
orchestrator: local
default_room: main
"#,
)
.unwrap();
let resolved = resolve_nats_url(None, None, &ws_path, None).await.unwrap();
assert_eq!(resolved, "nats://embedded-host:4222");
}
fn agent_info(id: &str, operator: Option<&str>, tags: &[&str]) -> AgentInfo {
AgentInfo {
agent_id: id.to_string(),
operator: operator.map(String::from),
operator_tags: tags.iter().map(|s| s.to_string()).collect(),
..Default::default()
}
}
#[test]
fn resolve_remote_orchestrator_returns_address_and_token() {
let tmpdir = TempDir::new().unwrap();
let ws_path = tmpdir.path().join("nsed.yaml");
std::fs::write(
&ws_path,
r#"
orchestrators:
prod:
mode: remote
address: "https://api.example.com"
token: "op-bearer-xyz"
policies:
default:
agents: [justindgx, justindgy]
rooms:
main:
policy: default
orchestrator: prod
default_room: main
"#,
)
.unwrap();
let (url, token) = resolve_remote_orchestrator(&ws_path, None).expect("remote resolves");
assert_eq!(url, "https://api.example.com");
assert_eq!(token, "op-bearer-xyz");
}
#[test]
fn resolve_remote_orchestrator_none_for_embedded() {
let tmpdir = TempDir::new().unwrap();
let ws_path = tmpdir.path().join("nsed.yaml");
std::fs::write(
&ws_path,
r#"
orchestrators:
local:
mode: embedded
nats_url: "nats://x:4222"
policies:
default:
agents: [a, b]
rooms:
main:
policy: default
orchestrator: local
default_room: main
"#,
)
.unwrap();
assert!(resolve_remote_orchestrator(&ws_path, None).is_none());
}
#[test]
fn load_fleet_unified_reads_quorum_yml_and_legacy_agent_yml() {
let tmpdir = TempDir::new().unwrap();
let q = tmpdir.path().join("quorum.yml");
std::fs::write(
&q,
r#"
orchestrators:
prod: { mode: remote, address: "https://x", token: "t" }
policies:
default: { agents: [a, b] }
rooms:
main: { policy: default, orchestrator: prod }
default_room: main
providers:
openai: { type: openai, api_key: "${K}" }
agents:
- name: justindgx
provider_id: openai
model_name: gpt-4o
"#,
)
.unwrap();
let fleet = load_fleet_unified(&q).expect("unified fleet");
assert_eq!(fleet.agents.len(), 1);
assert_eq!(fleet.agents[0].name, "justindgx");
assert!(fleet.providers.contains_key("openai"));
let a = tmpdir.path().join("agent.yml");
std::fs::write(
&a,
r#"
providers:
openai: { type: openai, api_key: "${K}" }
agents:
- name: legacy-bot
provider_id: openai
model_name: gpt-4o
orchestrators:
- url: "https://x"
bearer_token: "t"
"#,
)
.unwrap();
let fleet = load_fleet_unified(&a).expect("legacy fleet");
assert_eq!(fleet.agents.len(), 1);
assert_eq!(fleet.agents[0].name, "legacy-bot");
}
#[tokio::test]
async fn register_fleet_agents_empty_bearer_returns_empty_no_network() {
let fleet: crate::config::AgentFleetConfig = serde_yaml::from_str("agents: []\n").unwrap();
let map = register_fleet_agents("https://unused.example", "", &fleet, None).await;
assert!(map.is_empty());
}
#[test]
fn selfcheck_ok_when_operator_and_tags_present() {
let agents = vec![agent_info(
"justindgx",
Some("dgx-spark-justin"),
&["noosphera:x"],
)];
assert_eq!(evaluate_registration("justindgx", &agents), RegVerdict::Ok);
}
#[test]
fn selfcheck_dropped_when_absent_from_orchestrator() {
let agents = vec![agent_info("other", Some("op"), &["t"])];
assert_eq!(
evaluate_registration("justindgx", &agents),
RegVerdict::Dropped
);
}
#[test]
fn selfcheck_unattributed_when_operator_missing_or_empty() {
let none = vec![agent_info("a", None, &[])];
assert_eq!(evaluate_registration("a", &none), RegVerdict::Unattributed);
let empty = vec![agent_info("a", Some(""), &[])];
assert_eq!(evaluate_registration("a", &empty), RegVerdict::Unattributed);
}
#[test]
fn selfcheck_local_fallback_when_operator_is_local() {
let agents = vec![agent_info("a", Some("local"), &[])];
assert_eq!(
evaluate_registration("a", &agents),
RegVerdict::LocalFallback
);
}
#[test]
fn selfcheck_no_tags_when_operator_set_but_tagless() {
let agents = vec![agent_info("a", Some("real-op"), &[])];
assert_eq!(
evaluate_registration("a", &agents),
RegVerdict::NoOperatorTags
);
}
}