use crate::agents::config::{AgentConfig, BuiltinToolGrant};
use crate::config::{AgentFleetConfig, load_agent_from_config_with_registry, resolve_agent_names};
use crate::multi_agent::MultiAgentRunner;
use crate::nats_utils::NatsAuth;
use crate::providers::ProviderRegistry;
use crate::tools::{ScopedGrepTool, ScopedReadFileTool, Tool};
use crate::workers::{NatsNsedWorker, WorkerConfig};
use anyhow::{Context, Result};
use std::sync::Arc;
use tracing::{info, warn};
#[derive(Debug, Clone)]
pub struct ServeOptions {
pub nats_url: String,
pub nats_auth: Option<NatsAuth>,
pub agent_filter: Option<Vec<String>>,
pub stream_name: String,
pub api_prefix: String,
pub cancel: Option<tokio_util::sync::CancellationToken>,
pub dashboard_port: Option<u16>,
pub registry: Option<Arc<ProviderRegistry>>,
}
impl Default for ServeOptions {
fn default() -> Self {
Self {
nats_url: "nats://localhost:4222".to_string(),
nats_auth: None,
agent_filter: None,
stream_name: "sphera_jobs".to_string(),
api_prefix: "sphera".to_string(),
cancel: None,
dashboard_port: None,
registry: None,
}
}
}
fn redact_userinfo(url: &str) -> String {
let (scheme, rest) = match url.split_once("://") {
Some(pair) => pair,
None => return url.to_string(),
};
let authority_end = rest.find('/').unwrap_or(rest.len());
let (authority, path) = rest.split_at(authority_end);
match authority.rfind('@') {
Some(_) => match authority.rsplit_once('@') {
Some((_userinfo, host)) => format!("{scheme}://<redacted>@{host}{path}"),
None => url.to_string(),
},
None => url.to_string(),
}
}
pub(crate) fn instantiate_builtin_tools(
agent_config: &AgentConfig,
) -> Result<Vec<Box<dyn Tool>>, String> {
let mut tools: Vec<Box<dyn Tool>> = Vec::new();
for grant in &agent_config.builtin_tools {
match grant {
BuiltinToolGrant::ReadFile { roots, max_bytes } => {
let root_paths: Vec<std::path::PathBuf> =
roots.iter().map(std::path::PathBuf::from).collect();
let tool = ScopedReadFileTool::new(agent_config.name.clone(), &root_paths)
.with_max_bytes(*max_bytes as u64);
tools.push(Box::new(tool));
}
BuiltinToolGrant::Grep {
roots,
max_bytes,
max_results,
timeout_secs,
} => {
let tool = ScopedGrepTool::new(
agent_config.name.clone(),
roots,
*max_bytes,
*max_results,
*timeout_secs,
)?;
tools.push(Box::new(tool));
}
other => {
return Err(format!(
"builtin tool variant `{other:?}` is not supported by `quorum serve` yet \
(likely PdfQuery — impl lives in the BUSL nsed-agent crate, port pending). \
Run `nsed serve` for this agent until the port lands."
));
}
}
}
Ok(tools)
}
pub async fn build_worker(
fleet: &AgentFleetConfig,
agent_name: &str,
nats_url: &str,
nats_auth: Option<&NatsAuth>,
stream_name: &str,
api_prefix: &str,
registry: &ProviderRegistry,
) -> Result<Option<(NatsNsedWorker, AgentConfig)>> {
let (agent_config, provider) =
load_agent_from_config_with_registry(fleet, agent_name, registry)
.with_context(|| format!("failed to load agent '{agent_name}' from fleet config"))?;
let consumer_name = format!("agent_{}", agent_config.name);
let mut worker_config =
WorkerConfig::new(nats_url.to_string(), stream_name.to_string(), consumer_name)
.with_api_prefix(api_prefix.to_string());
if let Some(auth) = nats_auth {
worker_config = worker_config.with_nats_auth(auth.clone());
}
let agent = match registry.build_agent(&provider.provider_type, &agent_config, &provider)? {
Some(agent) => agent,
None => return Ok(None),
};
let worker =
NatsNsedWorker::from_dyn_agent(agent, agent_config.clone(), worker_config, None).await?;
Ok(Some((worker, agent_config)))
}
fn resolve_dashboard_port(opt_port: Option<u16>, fleet_port: Option<u16>) -> Option<u16> {
opt_port.or(fleet_port)
}
pub async fn serve_fleet(fleet: &AgentFleetConfig, opts: ServeOptions) -> Result<()> {
let filter = opts
.agent_filter
.as_ref()
.map(|v| v.join(","))
.unwrap_or_else(|| "ALL".to_string());
let names = resolve_agent_names(&filter, fleet);
if names.is_empty() {
anyhow::bail!(
"no agents to run — fleet config has {} agents but `agent_filter` matched none",
fleet.agents.len()
);
}
info!(
agent_count = names.len(),
nats_url = %redact_userinfo(&opts.nats_url),
"starting fleet"
);
let registry = opts
.registry
.clone()
.unwrap_or_else(|| Arc::new(ProviderRegistry::with_builtins()));
let mut runner = MultiAgentRunner::new();
if let Some(port) = resolve_dashboard_port(opts.dashboard_port, fleet.dashboard_port) {
#[cfg(feature = "status-server")]
{
info!(dashboard_port = port, "enabling unified dashboard");
runner.enable_dashboard(port);
}
#[cfg(not(feature = "status-server"))]
{
tracing::warn!(
dashboard_port = port,
"dashboard_port set but `status-server` feature not compiled in — no dashboard will start. \
Rebuild with `--features status-server` (or run a build that has it in `default`)."
);
let _ = port;
}
}
for name in &names {
match build_worker(
fleet,
name,
&opts.nats_url,
opts.nats_auth.as_ref(),
&opts.stream_name,
&opts.api_prefix,
®istry,
)
.await
{
Ok(Some((worker, agent_config))) => {
info!(agent = %name, "agent ready");
runner.add_worker(name.clone(), worker, agent_config);
}
Ok(None) => {
}
Err(e) => {
warn!(agent = %name, "failed to build agent: {e:#}, skipping");
}
}
}
if runner.is_empty() {
anyhow::bail!(
"no agents successfully started from fleet config (every entry failed or was skipped)"
);
}
let cancel = opts.cancel.unwrap_or_default();
runner.run_with_cancellation(cancel).await
}
#[doc(hidden)]
pub fn install_default_tracing() {
use tracing_subscriber::EnvFilter;
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,quorum_rs=info,async_nats=warn"));
let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ProviderEntry;
fn fleet_yaml(s: &str) -> AgentFleetConfig {
serde_yaml::from_str(s).expect("fleet yaml must parse")
}
#[tokio::test]
async fn serve_fleet_rejects_filter_with_unknown_agent() {
let fleet = fleet_yaml(
r#"
providers:
openai:
type: openai
base_url: "http://localhost:9999/v1"
api_key: "sk-test"
agents:
- name: cortex-a
provider_id: openai
model_name: gpt-4o
"#,
);
let opts = ServeOptions {
agent_filter: Some(vec!["does-not-exist".to_string()]),
..Default::default()
};
let err = serve_fleet(&fleet, opts).await.unwrap_err();
assert!(
err.to_string().contains("no agents successfully started"),
"must surface no-buildable-agents error; got: {err}"
);
}
#[tokio::test]
async fn serve_fleet_rejects_empty_fleet() {
let fleet = fleet_yaml("providers: {}\nagents: []\n");
let err = serve_fleet(&fleet, ServeOptions::default())
.await
.unwrap_err();
assert!(
err.to_string().contains("no agents to run"),
"must surface empty-fleet error; got: {err}"
);
}
#[test]
fn default_serve_options_target_localhost() {
let opts = ServeOptions::default();
assert_eq!(opts.nats_url, "nats://localhost:4222");
assert!(opts.nats_auth.is_none());
assert!(opts.agent_filter.is_none());
assert_eq!(opts.stream_name, "sphera_jobs");
assert_eq!(opts.api_prefix, "sphera");
}
#[tokio::test]
async fn build_worker_skips_exec_without_exec_section() {
let fleet = fleet_yaml(
r#"
providers:
exec_local:
type: exec
agents:
- name: broken
provider_id: exec_local
model_name: custom
"#,
);
let result = build_worker(
&fleet,
"broken",
"nats://localhost:0",
None,
"sphera_jobs",
"sphera",
&ProviderRegistry::with_builtins(),
)
.await;
assert!(
matches!(result, Ok(None)),
"exec provider with no exec section must skip cleanly (Ok(None)) before NATS connect; \
if NATS connection was attempted it would have errored on the unbindable port"
);
}
#[test]
fn redact_userinfo_strips_credentials() {
assert_eq!(
redact_userinfo("nats://user:pass@example.com:4222"),
"nats://<redacted>@example.com:4222"
);
assert_eq!(
redact_userinfo("nats://token@example.com:4222"),
"nats://<redacted>@example.com:4222"
);
}
#[test]
fn redact_userinfo_leaves_credential_free_urls_alone() {
for url in [
"nats://localhost:4222",
"nats://api.peeramid.xyz:4222",
"nats://10.0.0.1:4222",
] {
assert_eq!(redact_userinfo(url), url, "{url} must round-trip");
}
}
#[test]
fn redact_userinfo_ignores_at_sign_in_path() {
assert_eq!(
redact_userinfo("nats://example.com:4222/some@path"),
"nats://example.com:4222/some@path"
);
}
#[test]
fn redact_userinfo_handles_non_url_input() {
assert_eq!(redact_userinfo("not a url"), "not a url");
assert_eq!(redact_userinfo(""), "");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn serve_fleet_honours_pre_cancelled_token() {
let fleet = fleet_yaml(
r#"
providers:
exec_local:
type: exec
agents:
- name: noop
provider_id: exec_local
model_name: custom
"#,
);
let token = tokio_util::sync::CancellationToken::new();
token.cancel();
let opts = ServeOptions {
cancel: Some(token),
..Default::default()
};
let err = serve_fleet(&fleet, opts).await.unwrap_err();
assert!(
err.to_string().contains("no agents successfully started"),
"must bail with the empty-runner error, not a cancel error: {err}"
);
}
#[test]
fn provider_entry_has_expected_fields() {
let p: ProviderEntry = serde_yaml::from_str(
r#"
type: openai
base_url: "http://localhost:9999/v1"
api_key: "sk-test"
"#,
)
.expect("provider yaml must parse");
assert_eq!(p.provider_type, "openai");
assert_eq!(p.base_url, "http://localhost:9999/v1");
assert_eq!(p.api_key, "sk-test");
assert!(p.models.is_empty());
}
#[test]
fn resolve_dashboard_port_cli_flag_wins() {
assert_eq!(
super::resolve_dashboard_port(Some(8081), Some(9090)),
Some(8081)
);
}
#[test]
fn resolve_dashboard_port_falls_back_to_fleet() {
assert_eq!(super::resolve_dashboard_port(None, Some(9090)), Some(9090));
}
#[test]
fn resolve_dashboard_port_returns_none_when_both_absent() {
assert_eq!(super::resolve_dashboard_port(None, None), None);
}
#[test]
fn fleet_yaml_carries_dashboard_port() {
let yaml = "providers: {}\nagents: []\ndashboard_port: 8081\n";
let cfg: AgentFleetConfig =
serde_yaml::from_str(yaml).expect("fleet yaml must parse with dashboard_port");
assert_eq!(cfg.dashboard_port, Some(8081));
}
fn agent_with_grants(name: &str, grants: Vec<BuiltinToolGrant>) -> AgentConfig {
AgentConfig {
name: name.to_string(),
builtin_tools: grants,
..Default::default()
}
}
#[test]
fn instantiate_builtin_tools_wires_read_and_grep() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path().display().to_string();
let grants = vec![
BuiltinToolGrant::ReadFile {
roots: vec![root.clone()],
max_bytes: 1024,
},
BuiltinToolGrant::Grep {
roots: vec![root],
max_bytes: 1024,
max_results: 10,
timeout_secs: 5,
},
];
let cfg = agent_with_grants("test-agent", grants);
let tools = super::instantiate_builtin_tools(&cfg).expect("both grants must instantiate");
assert_eq!(tools.len(), 2, "expected one tool per grant");
}
#[test]
fn instantiate_builtin_tools_empty_grants_returns_empty() {
let cfg = agent_with_grants("test-agent", vec![]);
let tools = super::instantiate_builtin_tools(&cfg).expect("empty grants must succeed");
assert!(tools.is_empty());
}
#[test]
fn instantiate_builtin_tools_pdf_query_returns_err() {
let grants = vec![BuiltinToolGrant::PdfQuery {
trees_root: "/tmp".into(),
script_path: "/tmp/x".into(),
python_bin: "python3".into(),
max_bytes: 1024,
max_results: 10,
timeout_secs: 5,
}];
let cfg = agent_with_grants("test-agent", grants);
let err = super::instantiate_builtin_tools(&cfg).unwrap_err();
assert!(
err.contains("PdfQuery") && err.contains("nsed serve"),
"error must name the variant + redirect to nsed serve; got: {err}"
);
}
#[test]
fn instantiate_builtin_tools_grep_bad_root_returns_err() {
let grants = vec![BuiltinToolGrant::Grep {
roots: vec!["/path/that/does/not/exist/12345".into()],
max_bytes: 1024,
max_results: 10,
timeout_secs: 5,
}];
let cfg = agent_with_grants("test-agent", grants);
let err = super::instantiate_builtin_tools(&cfg).unwrap_err();
assert!(
err.to_lowercase().contains("canonicalize")
|| err.to_lowercase().contains("not found")
|| err.contains("/path/that/does/not/exist"),
"error must mention the bad root; got: {err}"
);
}
}