opencode_orchestrator_mcp/
server.rs1use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17use crate::version;
18
19pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
21
22pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
24 in a nested orchestration session. Consult a human for assistance or try to accomplish your \
25 task without the orchestration tools.";
26
27pub fn managed_guard_enabled() -> bool {
29 match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
30 Ok(v) => v != "0" && !v.trim().is_empty(),
31 Err(_) => false,
32 }
33}
34
35pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
37where
38 F: FnMut(usize) -> Fut,
39 Fut: std::future::Future<Output = anyhow::Result<T>>,
40{
41 let mut last_err: Option<anyhow::Error> = None;
42
43 for attempt in 1..=2 {
44 tracing::info!(attempt, "orchestrator server lazy init attempt");
45 match f(attempt).await {
46 Ok(v) => {
47 if attempt > 1 {
48 tracing::info!(
49 attempt,
50 "orchestrator server lazy init succeeded after retry"
51 );
52 }
53 return Ok(v);
54 }
55 Err(e) => {
56 tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
57 last_err = Some(e);
58 }
59 }
60 }
61
62 tracing::error!("orchestrator server lazy init exhausted retries");
63 match last_err {
65 Some(e) => Err(e),
66 None => anyhow::bail!("init_with_retry: unexpected empty error state"),
67 }
68}
69
70pub type ModelKey = (String, String);
72
73pub struct OrchestratorServer {
75 _managed: Option<ManagedServer>,
78 client: Client,
80 model_context_limits: HashMap<ModelKey, u64>,
82 base_url: String,
84 config: OrchestratorConfig,
86}
87
88impl OrchestratorServer {
89 #[allow(clippy::allow_attributes, dead_code)]
98 pub async fn start() -> anyhow::Result<Arc<Self>> {
99 Ok(Arc::new(Self::start_impl().await?))
100 }
101
102 pub async fn start_lazy() -> anyhow::Result<Self> {
112 Self::start_lazy_with_config(None).await
113 }
114
115 pub async fn start_lazy_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
126 if managed_guard_enabled() {
127 anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
128 }
129
130 init_with_retry(|_attempt| {
131 let cfg = config_json.clone();
132 async move { Self::start_impl_with_config(cfg).await }
133 })
134 .await
135 }
136
137 async fn start_impl() -> anyhow::Result<Self> {
139 let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
140
141 let config = match agentic_config::loader::load_merged(&cwd) {
143 Ok(loaded) => {
144 for w in &loaded.warnings {
145 tracing::warn!("{w}");
146 }
147 loaded.config.orchestrator
148 }
149 Err(e) => {
150 tracing::warn!("Failed to load config, using defaults: {e}");
151 OrchestratorConfig::default()
152 }
153 };
154
155 let launcher_config = version::resolve_launcher_config(&cwd)
156 .context("Failed to resolve OpenCode launcher configuration")?;
157
158 tracing::info!(
159 binary = %launcher_config.binary,
160 launcher_args = ?launcher_config.launcher_args,
161 expected_version = %version::PINNED_OPENCODE_VERSION,
162 "starting embedded opencode serve (pinned stable)"
163 );
164
165 let opts = ServerOptions::default()
166 .binary(&launcher_config.binary)
167 .launcher_args(launcher_config.launcher_args)
168 .directory(cwd.clone());
169
170 let managed = ManagedServer::start(opts)
171 .await
172 .context("Failed to start embedded `opencode serve`")?;
173
174 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
176
177 let client = Client::builder()
178 .base_url(&base_url)
179 .directory(cwd.to_string_lossy().to_string())
180 .build()
181 .context("Failed to build opencode-rs HTTP client")?;
182
183 let health = client
184 .misc()
185 .health()
186 .await
187 .context("Failed to fetch /global/health for version validation")?;
188
189 version::validate_exact_version(health.version.as_deref()).with_context(|| {
190 format!(
191 "Embedded OpenCode server did not match pinned stable v{} (binary={})",
192 version::PINNED_OPENCODE_VERSION,
193 launcher_config.binary
194 )
195 })?;
196
197 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
199 tracing::warn!("Failed to load model limits: {}", e);
200 HashMap::new()
201 });
202
203 tracing::info!("Loaded {} model context limits", model_context_limits.len());
204
205 Ok(Self {
206 _managed: Some(managed),
207 client,
208 model_context_limits,
209 base_url,
210 config,
211 })
212 }
213
214 async fn start_impl_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
216 let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
217
218 let config = match agentic_config::loader::load_merged(&cwd) {
220 Ok(loaded) => {
221 for w in &loaded.warnings {
222 tracing::warn!("{w}");
223 }
224 loaded.config.orchestrator
225 }
226 Err(e) => {
227 tracing::warn!("Failed to load config, using defaults: {e}");
228 OrchestratorConfig::default()
229 }
230 };
231
232 let launcher_config = version::resolve_launcher_config(&cwd)
233 .context("Failed to resolve OpenCode launcher configuration")?;
234
235 tracing::info!(
236 binary = %launcher_config.binary,
237 launcher_args = ?launcher_config.launcher_args,
238 expected_version = %version::PINNED_OPENCODE_VERSION,
239 config_injected = config_json.is_some(),
240 "starting embedded opencode serve (pinned stable)"
241 );
242
243 let mut opts = ServerOptions::default()
244 .binary(&launcher_config.binary)
245 .launcher_args(launcher_config.launcher_args)
246 .directory(cwd.clone());
247
248 if let Some(cfg) = config_json {
250 opts = opts.config_json(cfg);
251 }
252
253 let managed = ManagedServer::start(opts)
254 .await
255 .context("Failed to start embedded `opencode serve`")?;
256
257 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
259
260 let client = Client::builder()
261 .base_url(&base_url)
262 .directory(cwd.to_string_lossy().to_string())
263 .build()
264 .context("Failed to build opencode-rs HTTP client")?;
265
266 let health = client
267 .misc()
268 .health()
269 .await
270 .context("Failed to fetch /global/health for version validation")?;
271
272 version::validate_exact_version(health.version.as_deref()).with_context(|| {
273 format!(
274 "Embedded OpenCode server did not match pinned stable v{} (binary={})",
275 version::PINNED_OPENCODE_VERSION,
276 launcher_config.binary
277 )
278 })?;
279
280 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
282 tracing::warn!("Failed to load model limits: {}", e);
283 HashMap::new()
284 });
285
286 tracing::info!("Loaded {} model context limits", model_context_limits.len());
287
288 Ok(Self {
289 _managed: Some(managed),
290 client,
291 model_context_limits,
292 base_url,
293 config,
294 })
295 }
296
297 pub fn client(&self) -> &Client {
299 &self.client
300 }
301
302 #[allow(clippy::allow_attributes, dead_code)]
304 pub fn base_url(&self) -> &str {
305 &self.base_url
306 }
307
308 pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
310 self.model_context_limits
311 .get(&(provider_id.to_string(), model_id.to_string()))
312 .copied()
313 }
314
315 pub fn session_deadline(&self) -> Duration {
317 Duration::from_secs(self.config.session_deadline_secs)
318 }
319
320 pub fn inactivity_timeout(&self) -> Duration {
322 Duration::from_secs(self.config.inactivity_timeout_secs)
323 }
324
325 pub fn compaction_threshold(&self) -> f64 {
327 self.config.compaction_threshold
328 }
329
330 async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
332 let resp: ProviderListResponse = client.providers().list().await?;
333 let mut limits = HashMap::new();
334
335 for provider in resp.all {
336 for (model_id, model) in provider.models {
337 if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
338 limits.insert((provider.id.clone(), model_id), limit);
339 }
340 }
341 }
342
343 Ok(limits)
344 }
345
346 pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
348 let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
350
351 let text: String = assistant_msg
353 .parts
354 .iter()
355 .filter_map(|p| {
356 if let Part::Text { text, .. } = p {
357 Some(text.as_str())
358 } else {
359 None
360 }
361 })
362 .collect::<Vec<_>>()
363 .join("");
364
365 if text.trim().is_empty() {
366 None
367 } else {
368 Some(text)
369 }
370 }
371}
372
373#[cfg(feature = "test-support")]
379#[allow(dead_code, clippy::allow_attributes)]
380impl OrchestratorServer {
381 pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
386 Arc::new(Self::from_client_unshared(client, base_url))
387 }
388
389 pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
393 Self {
394 _managed: None,
395 client,
396 model_context_limits: HashMap::new(),
397 base_url: base_url.into().trim_end_matches('/').to_string(),
398 config: OrchestratorConfig::default(),
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use serial_test::serial;
407 use std::sync::atomic::AtomicUsize;
408 use std::sync::atomic::Ordering;
409
410 #[tokio::test]
411 async fn init_with_retry_succeeds_on_first_attempt() {
412 let attempts = AtomicUsize::new(0);
413
414 let result: u32 = init_with_retry(|_| {
415 let n = attempts.fetch_add(1, Ordering::SeqCst);
416 async move {
417 assert_eq!(n, 0, "should only be called once on success");
419 Ok(42)
420 }
421 })
422 .await
423 .unwrap();
424
425 assert_eq!(result, 42);
426 assert_eq!(attempts.load(Ordering::SeqCst), 1);
427 }
428
429 #[tokio::test]
430 async fn init_with_retry_retries_once_and_succeeds() {
431 let attempts = AtomicUsize::new(0);
432
433 let result: u32 = init_with_retry(|_| {
434 let n = attempts.fetch_add(1, Ordering::SeqCst);
435 async move {
436 if n == 0 {
437 anyhow::bail!("fail first");
438 }
439 Ok(42)
440 }
441 })
442 .await
443 .unwrap();
444
445 assert_eq!(result, 42);
446 assert_eq!(attempts.load(Ordering::SeqCst), 2);
447 }
448
449 #[tokio::test]
450 async fn init_with_retry_fails_after_two_attempts() {
451 let attempts = AtomicUsize::new(0);
452
453 let err = init_with_retry::<(), _, _>(|_| {
454 attempts.fetch_add(1, Ordering::SeqCst);
455 async { anyhow::bail!("always fail") }
456 })
457 .await
458 .unwrap_err();
459
460 assert!(err.to_string().contains("always fail"));
461 assert_eq!(attempts.load(Ordering::SeqCst), 2);
462 }
463
464 #[test]
465 #[serial(env)]
466 fn managed_guard_disabled_when_env_not_set() {
467 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
470 assert!(!managed_guard_enabled());
471 }
472
473 #[test]
474 #[serial(env)]
475 fn managed_guard_enabled_when_env_is_1() {
476 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
478 assert!(managed_guard_enabled());
479 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
481 }
482
483 #[test]
484 #[serial(env)]
485 fn managed_guard_disabled_when_env_is_0() {
486 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
488 assert!(!managed_guard_enabled());
489 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
491 }
492
493 #[test]
494 #[serial(env)]
495 fn managed_guard_disabled_when_env_is_empty() {
496 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
498 assert!(!managed_guard_enabled());
499 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
501 }
502
503 #[test]
504 #[serial(env)]
505 fn managed_guard_disabled_when_env_is_whitespace() {
506 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, " ") };
508 assert!(!managed_guard_enabled());
509 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
511 }
512
513 #[test]
514 #[serial(env)]
515 fn managed_guard_enabled_when_env_is_truthy() {
516 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
518 assert!(managed_guard_enabled());
519 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
521 }
522}