use agentic_config::types::OrchestratorConfig;
use anyhow::Context;
use opencode_rs::Client;
use opencode_rs::server::ManagedServer;
use opencode_rs::server::ServerOptions;
use opencode_rs::types::message::Message;
use opencode_rs::types::message::Part;
use opencode_rs::types::provider::ProviderListResponse;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use crate::version;
pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
in a nested orchestration session. Consult a human for assistance or try to accomplish your \
task without the orchestration tools.";
pub fn managed_guard_enabled() -> bool {
match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
Ok(v) => v != "0" && !v.trim().is_empty(),
Err(_) => false,
}
}
pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
where
F: FnMut(usize) -> Fut,
Fut: std::future::Future<Output = anyhow::Result<T>>,
{
let mut last_err: Option<anyhow::Error> = None;
for attempt in 1..=2 {
tracing::info!(attempt, "orchestrator server lazy init attempt");
match f(attempt).await {
Ok(v) => {
if attempt > 1 {
tracing::info!(
attempt,
"orchestrator server lazy init succeeded after retry"
);
}
return Ok(v);
}
Err(e) => {
tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
last_err = Some(e);
}
}
}
tracing::error!("orchestrator server lazy init exhausted retries");
match last_err {
Some(e) => Err(e),
None => anyhow::bail!("init_with_retry: unexpected empty error state"),
}
}
pub type ModelKey = (String, String);
pub struct OrchestratorServer {
_managed: Option<ManagedServer>,
client: Client,
model_context_limits: HashMap<ModelKey, u64>,
base_url: String,
config: OrchestratorConfig,
spawned_sessions: Arc<RwLock<HashSet<String>>>,
}
impl OrchestratorServer {
#[allow(clippy::allow_attributes, dead_code)]
pub async fn start() -> anyhow::Result<Arc<Self>> {
Ok(Arc::new(Self::start_impl().await?))
}
pub async fn start_lazy() -> anyhow::Result<Self> {
Self::start_lazy_with_config(None).await
}
pub async fn start_lazy_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
if managed_guard_enabled() {
anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
}
init_with_retry(|_attempt| {
let cfg = config_json.clone();
async move { Self::start_impl_with_config(cfg).await }
})
.await
}
async fn start_impl() -> anyhow::Result<Self> {
let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
let config = match agentic_config::loader::load_merged(&cwd) {
Ok(loaded) => {
for w in &loaded.warnings {
tracing::warn!("{w}");
}
loaded.config.orchestrator
}
Err(e) => {
tracing::warn!("Failed to load config, using defaults: {e}");
OrchestratorConfig::default()
}
};
let launcher_config = version::resolve_launcher_config(&cwd)
.context("Failed to resolve OpenCode launcher configuration")?;
tracing::info!(
binary = %launcher_config.binary,
launcher_args = ?launcher_config.launcher_args,
expected_version = %version::PINNED_OPENCODE_VERSION,
"starting embedded opencode serve (pinned stable)"
);
let opts = ServerOptions::default()
.binary(&launcher_config.binary)
.launcher_args(launcher_config.launcher_args)
.directory(cwd.clone());
let managed = ManagedServer::start(opts)
.await
.context("Failed to start embedded `opencode serve`")?;
let base_url = managed.url().to_string().trim_end_matches('/').to_string();
let client = Client::builder()
.base_url(&base_url)
.directory(cwd.to_string_lossy().to_string())
.build()
.context("Failed to build opencode-rs HTTP client")?;
let health = client
.misc()
.health()
.await
.context("Failed to fetch /global/health for version validation")?;
version::validate_exact_version(health.version.as_deref()).with_context(|| {
format!(
"Embedded OpenCode server did not match pinned stable v{} (binary={})",
version::PINNED_OPENCODE_VERSION,
launcher_config.binary
)
})?;
let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
tracing::warn!("Failed to load model limits: {}", e);
HashMap::new()
});
tracing::info!("Loaded {} model context limits", model_context_limits.len());
Ok(Self {
_managed: Some(managed),
client,
model_context_limits,
base_url,
config,
spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
})
}
async fn start_impl_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
let config = match agentic_config::loader::load_merged(&cwd) {
Ok(loaded) => {
for w in &loaded.warnings {
tracing::warn!("{w}");
}
loaded.config.orchestrator
}
Err(e) => {
tracing::warn!("Failed to load config, using defaults: {e}");
OrchestratorConfig::default()
}
};
let launcher_config = version::resolve_launcher_config(&cwd)
.context("Failed to resolve OpenCode launcher configuration")?;
tracing::info!(
binary = %launcher_config.binary,
launcher_args = ?launcher_config.launcher_args,
expected_version = %version::PINNED_OPENCODE_VERSION,
config_injected = config_json.is_some(),
"starting embedded opencode serve (pinned stable)"
);
let mut opts = ServerOptions::default()
.binary(&launcher_config.binary)
.launcher_args(launcher_config.launcher_args)
.directory(cwd.clone());
if let Some(cfg) = config_json {
opts = opts.config_json(cfg);
}
let managed = ManagedServer::start(opts)
.await
.context("Failed to start embedded `opencode serve`")?;
let base_url = managed.url().to_string().trim_end_matches('/').to_string();
let client = Client::builder()
.base_url(&base_url)
.directory(cwd.to_string_lossy().to_string())
.build()
.context("Failed to build opencode-rs HTTP client")?;
let health = client
.misc()
.health()
.await
.context("Failed to fetch /global/health for version validation")?;
version::validate_exact_version(health.version.as_deref()).with_context(|| {
format!(
"Embedded OpenCode server did not match pinned stable v{} (binary={})",
version::PINNED_OPENCODE_VERSION,
launcher_config.binary
)
})?;
let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
tracing::warn!("Failed to load model limits: {}", e);
HashMap::new()
});
tracing::info!("Loaded {} model context limits", model_context_limits.len());
Ok(Self {
_managed: Some(managed),
client,
model_context_limits,
base_url,
config,
spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
})
}
pub fn client(&self) -> &Client {
&self.client
}
#[allow(clippy::allow_attributes, dead_code)]
pub fn base_url(&self) -> &str {
&self.base_url
}
pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
self.model_context_limits
.get(&(provider_id.to_string(), model_id.to_string()))
.copied()
}
pub fn session_deadline(&self) -> Duration {
Duration::from_secs(self.config.session_deadline_secs)
}
pub fn inactivity_timeout(&self) -> Duration {
Duration::from_secs(self.config.inactivity_timeout_secs)
}
pub fn compaction_threshold(&self) -> f64 {
self.config.compaction_threshold
}
pub fn spawned_sessions(&self) -> &Arc<RwLock<HashSet<String>>> {
&self.spawned_sessions
}
async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
let resp: ProviderListResponse = client.providers().list().await?;
let mut limits = HashMap::new();
for provider in resp.all {
for (model_id, model) in provider.models {
if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
limits.insert((provider.id.clone(), model_id), limit);
}
}
}
Ok(limits)
}
pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
let text: String = assistant_msg
.parts
.iter()
.filter_map(|p| {
if let Part::Text { text, .. } = p {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join("");
if text.trim().is_empty() {
None
} else {
Some(text)
}
}
}
#[cfg(feature = "test-support")]
#[allow(dead_code, clippy::allow_attributes)]
impl OrchestratorServer {
pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
Arc::new(Self::from_client_unshared(client, base_url))
}
pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
Self {
_managed: None,
client,
model_context_limits: HashMap::new(),
base_url: base_url.into().trim_end_matches('/').to_string(),
config: OrchestratorConfig::default(),
spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
#[tokio::test]
async fn init_with_retry_succeeds_on_first_attempt() {
let attempts = AtomicUsize::new(0);
let result: u32 = init_with_retry(|_| {
let n = attempts.fetch_add(1, Ordering::SeqCst);
async move {
assert_eq!(n, 0, "should only be called once on success");
Ok(42)
}
})
.await
.unwrap();
assert_eq!(result, 42);
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn init_with_retry_retries_once_and_succeeds() {
let attempts = AtomicUsize::new(0);
let result: u32 = init_with_retry(|_| {
let n = attempts.fetch_add(1, Ordering::SeqCst);
async move {
if n == 0 {
anyhow::bail!("fail first");
}
Ok(42)
}
})
.await
.unwrap();
assert_eq!(result, 42);
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn init_with_retry_fails_after_two_attempts() {
let attempts = AtomicUsize::new(0);
let err = init_with_retry::<(), _, _>(|_| {
attempts.fetch_add(1, Ordering::SeqCst);
async { anyhow::bail!("always fail") }
})
.await
.unwrap_err();
assert!(err.to_string().contains("always fail"));
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}
#[test]
#[serial(env)]
fn managed_guard_disabled_when_env_not_set() {
unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
assert!(!managed_guard_enabled());
}
#[test]
#[serial(env)]
fn managed_guard_enabled_when_env_is_1() {
unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
assert!(managed_guard_enabled());
unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
}
#[test]
#[serial(env)]
fn managed_guard_disabled_when_env_is_0() {
unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
assert!(!managed_guard_enabled());
unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
}
#[test]
#[serial(env)]
fn managed_guard_disabled_when_env_is_empty() {
unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
assert!(!managed_guard_enabled());
unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
}
#[test]
#[serial(env)]
fn managed_guard_disabled_when_env_is_whitespace() {
unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, " ") };
assert!(!managed_guard_enabled());
unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
}
#[test]
#[serial(env)]
fn managed_guard_enabled_when_env_is_truthy() {
unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
assert!(managed_guard_enabled());
unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
}
}