opencode_orchestrator_mcp/
server.rs1use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::collections::HashSet;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::RwLock;
18
19use crate::version;
20
21pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
23
24pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
26 in a nested orchestration session. Consult a human for assistance or try to accomplish your \
27 task without the orchestration tools.";
28
29pub fn managed_guard_enabled() -> bool {
31 match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
32 Ok(v) => v != "0" && !v.trim().is_empty(),
33 Err(_) => false,
34 }
35}
36
37pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
39where
40 F: FnMut(usize) -> Fut,
41 Fut: std::future::Future<Output = anyhow::Result<T>>,
42{
43 let mut last_err: Option<anyhow::Error> = None;
44
45 for attempt in 1..=2 {
46 tracing::info!(attempt, "orchestrator server lazy init attempt");
47 match f(attempt).await {
48 Ok(v) => {
49 if attempt > 1 {
50 tracing::info!(
51 attempt,
52 "orchestrator server lazy init succeeded after retry"
53 );
54 }
55 return Ok(v);
56 }
57 Err(e) => {
58 tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
59 last_err = Some(e);
60 }
61 }
62 }
63
64 tracing::error!("orchestrator server lazy init exhausted retries");
65 match last_err {
67 Some(e) => Err(e),
68 None => anyhow::bail!("init_with_retry: unexpected empty error state"),
69 }
70}
71
72pub type ModelKey = (String, String);
74
75pub struct OrchestratorServer {
77 _managed: Option<ManagedServer>,
80 client: Client,
82 model_context_limits: HashMap<ModelKey, u64>,
84 base_url: String,
86 config: OrchestratorConfig,
88 spawned_sessions: Arc<RwLock<HashSet<String>>>,
90}
91
92impl OrchestratorServer {
93 #[allow(clippy::allow_attributes, dead_code)]
102 pub async fn start() -> anyhow::Result<Arc<Self>> {
103 Ok(Arc::new(Self::start_impl().await?))
104 }
105
106 pub async fn start_lazy() -> anyhow::Result<Self> {
116 Self::start_lazy_with_config(None).await
117 }
118
119 pub async fn start_lazy_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
130 if managed_guard_enabled() {
131 anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
132 }
133
134 init_with_retry(|_attempt| {
135 let cfg = config_json.clone();
136 async move { Self::start_impl_with_config(cfg).await }
137 })
138 .await
139 }
140
141 async fn start_impl() -> anyhow::Result<Self> {
143 let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
144
145 let config = match agentic_config::loader::load_merged(&cwd) {
147 Ok(loaded) => {
148 for w in &loaded.warnings {
149 tracing::warn!("{w}");
150 }
151 loaded.config.orchestrator
152 }
153 Err(e) => {
154 tracing::warn!("Failed to load config, using defaults: {e}");
155 OrchestratorConfig::default()
156 }
157 };
158
159 let launcher_config = version::resolve_launcher_config(&cwd)
160 .context("Failed to resolve OpenCode launcher configuration")?;
161
162 tracing::info!(
163 binary = %launcher_config.binary,
164 launcher_args = ?launcher_config.launcher_args,
165 expected_version = %version::PINNED_OPENCODE_VERSION,
166 "starting embedded opencode serve (pinned stable)"
167 );
168
169 let opts = ServerOptions::default()
170 .binary(&launcher_config.binary)
171 .launcher_args(launcher_config.launcher_args)
172 .directory(cwd.clone());
173
174 let managed = ManagedServer::start(opts)
175 .await
176 .context("Failed to start embedded `opencode serve`")?;
177
178 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
180
181 let client = Client::builder()
182 .base_url(&base_url)
183 .directory(cwd.to_string_lossy().to_string())
184 .build()
185 .context("Failed to build opencode-rs HTTP client")?;
186
187 let health = client
188 .misc()
189 .health()
190 .await
191 .context("Failed to fetch /global/health for version validation")?;
192
193 version::validate_exact_version(health.version.as_deref()).with_context(|| {
194 format!(
195 "Embedded OpenCode server did not match pinned stable v{} (binary={})",
196 version::PINNED_OPENCODE_VERSION,
197 launcher_config.binary
198 )
199 })?;
200
201 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
203 tracing::warn!("Failed to load model limits: {}", e);
204 HashMap::new()
205 });
206
207 tracing::info!("Loaded {} model context limits", model_context_limits.len());
208
209 Ok(Self {
210 _managed: Some(managed),
211 client,
212 model_context_limits,
213 base_url,
214 config,
215 spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
216 })
217 }
218
219 async fn start_impl_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
221 let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
222
223 let config = match agentic_config::loader::load_merged(&cwd) {
225 Ok(loaded) => {
226 for w in &loaded.warnings {
227 tracing::warn!("{w}");
228 }
229 loaded.config.orchestrator
230 }
231 Err(e) => {
232 tracing::warn!("Failed to load config, using defaults: {e}");
233 OrchestratorConfig::default()
234 }
235 };
236
237 let launcher_config = version::resolve_launcher_config(&cwd)
238 .context("Failed to resolve OpenCode launcher configuration")?;
239
240 tracing::info!(
241 binary = %launcher_config.binary,
242 launcher_args = ?launcher_config.launcher_args,
243 expected_version = %version::PINNED_OPENCODE_VERSION,
244 config_injected = config_json.is_some(),
245 "starting embedded opencode serve (pinned stable)"
246 );
247
248 let mut opts = ServerOptions::default()
249 .binary(&launcher_config.binary)
250 .launcher_args(launcher_config.launcher_args)
251 .directory(cwd.clone());
252
253 if let Some(cfg) = config_json {
255 opts = opts.config_json(cfg);
256 }
257
258 let managed = ManagedServer::start(opts)
259 .await
260 .context("Failed to start embedded `opencode serve`")?;
261
262 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
264
265 let client = Client::builder()
266 .base_url(&base_url)
267 .directory(cwd.to_string_lossy().to_string())
268 .build()
269 .context("Failed to build opencode-rs HTTP client")?;
270
271 let health = client
272 .misc()
273 .health()
274 .await
275 .context("Failed to fetch /global/health for version validation")?;
276
277 version::validate_exact_version(health.version.as_deref()).with_context(|| {
278 format!(
279 "Embedded OpenCode server did not match pinned stable v{} (binary={})",
280 version::PINNED_OPENCODE_VERSION,
281 launcher_config.binary
282 )
283 })?;
284
285 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
287 tracing::warn!("Failed to load model limits: {}", e);
288 HashMap::new()
289 });
290
291 tracing::info!("Loaded {} model context limits", model_context_limits.len());
292
293 Ok(Self {
294 _managed: Some(managed),
295 client,
296 model_context_limits,
297 base_url,
298 config,
299 spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
300 })
301 }
302
303 pub fn client(&self) -> &Client {
305 &self.client
306 }
307
308 #[allow(clippy::allow_attributes, dead_code)]
310 pub fn base_url(&self) -> &str {
311 &self.base_url
312 }
313
314 pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
316 self.model_context_limits
317 .get(&(provider_id.to_string(), model_id.to_string()))
318 .copied()
319 }
320
321 pub fn session_deadline(&self) -> Duration {
323 Duration::from_secs(self.config.session_deadline_secs)
324 }
325
326 pub fn inactivity_timeout(&self) -> Duration {
328 Duration::from_secs(self.config.inactivity_timeout_secs)
329 }
330
331 pub fn compaction_threshold(&self) -> f64 {
333 self.config.compaction_threshold
334 }
335
336 pub fn spawned_sessions(&self) -> &Arc<RwLock<HashSet<String>>> {
338 &self.spawned_sessions
339 }
340
341 async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
343 let resp: ProviderListResponse = client.providers().list().await?;
344 let mut limits = HashMap::new();
345
346 for provider in resp.all {
347 for (model_id, model) in provider.models {
348 if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
349 limits.insert((provider.id.clone(), model_id), limit);
350 }
351 }
352 }
353
354 Ok(limits)
355 }
356
357 pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
359 let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
361
362 let text: String = assistant_msg
364 .parts
365 .iter()
366 .filter_map(|p| {
367 if let Part::Text { text, .. } = p {
368 Some(text.as_str())
369 } else {
370 None
371 }
372 })
373 .collect::<Vec<_>>()
374 .join("");
375
376 if text.trim().is_empty() {
377 None
378 } else {
379 Some(text)
380 }
381 }
382}
383
384#[cfg(feature = "test-support")]
390#[allow(dead_code, clippy::allow_attributes)]
391impl OrchestratorServer {
392 pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
397 Arc::new(Self::from_client_unshared(client, base_url))
398 }
399
400 pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
404 Self {
405 _managed: None,
406 client,
407 model_context_limits: HashMap::new(),
408 base_url: base_url.into().trim_end_matches('/').to_string(),
409 config: OrchestratorConfig::default(),
410 spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
411 }
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use serial_test::serial;
419 use std::sync::atomic::AtomicUsize;
420 use std::sync::atomic::Ordering;
421
422 #[tokio::test]
423 async fn init_with_retry_succeeds_on_first_attempt() {
424 let attempts = AtomicUsize::new(0);
425
426 let result: u32 = init_with_retry(|_| {
427 let n = attempts.fetch_add(1, Ordering::SeqCst);
428 async move {
429 assert_eq!(n, 0, "should only be called once on success");
431 Ok(42)
432 }
433 })
434 .await
435 .unwrap();
436
437 assert_eq!(result, 42);
438 assert_eq!(attempts.load(Ordering::SeqCst), 1);
439 }
440
441 #[tokio::test]
442 async fn init_with_retry_retries_once_and_succeeds() {
443 let attempts = AtomicUsize::new(0);
444
445 let result: u32 = init_with_retry(|_| {
446 let n = attempts.fetch_add(1, Ordering::SeqCst);
447 async move {
448 if n == 0 {
449 anyhow::bail!("fail first");
450 }
451 Ok(42)
452 }
453 })
454 .await
455 .unwrap();
456
457 assert_eq!(result, 42);
458 assert_eq!(attempts.load(Ordering::SeqCst), 2);
459 }
460
461 #[tokio::test]
462 async fn init_with_retry_fails_after_two_attempts() {
463 let attempts = AtomicUsize::new(0);
464
465 let err = init_with_retry::<(), _, _>(|_| {
466 attempts.fetch_add(1, Ordering::SeqCst);
467 async { anyhow::bail!("always fail") }
468 })
469 .await
470 .unwrap_err();
471
472 assert!(err.to_string().contains("always fail"));
473 assert_eq!(attempts.load(Ordering::SeqCst), 2);
474 }
475
476 #[test]
477 #[serial(env)]
478 fn managed_guard_disabled_when_env_not_set() {
479 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
482 assert!(!managed_guard_enabled());
483 }
484
485 #[test]
486 #[serial(env)]
487 fn managed_guard_enabled_when_env_is_1() {
488 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
490 assert!(managed_guard_enabled());
491 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
493 }
494
495 #[test]
496 #[serial(env)]
497 fn managed_guard_disabled_when_env_is_0() {
498 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
500 assert!(!managed_guard_enabled());
501 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
503 }
504
505 #[test]
506 #[serial(env)]
507 fn managed_guard_disabled_when_env_is_empty() {
508 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
510 assert!(!managed_guard_enabled());
511 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
513 }
514
515 #[test]
516 #[serial(env)]
517 fn managed_guard_disabled_when_env_is_whitespace() {
518 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, " ") };
520 assert!(!managed_guard_enabled());
521 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
523 }
524
525 #[test]
526 #[serial(env)]
527 fn managed_guard_enabled_when_env_is_truthy() {
528 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
530 assert!(managed_guard_enabled());
531 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
533 }
534}