opencode_orchestrator_mcp/
server.rs1use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17use crate::version;
18
19pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
21
22pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
24 in a nested orchestration session. Consult a human for assistance or try to accomplish your \
25 task without the orchestration tools.";
26
27pub fn managed_guard_enabled() -> bool {
29 match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
30 Ok(v) => v != "0" && !v.trim().is_empty(),
31 Err(_) => false,
32 }
33}
34
35pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
37where
38 F: FnMut(usize) -> Fut,
39 Fut: std::future::Future<Output = anyhow::Result<T>>,
40{
41 let mut last_err: Option<anyhow::Error> = None;
42
43 for attempt in 1..=2 {
44 tracing::info!(attempt, "orchestrator server lazy init attempt");
45 match f(attempt).await {
46 Ok(v) => {
47 if attempt > 1 {
48 tracing::info!(
49 attempt,
50 "orchestrator server lazy init succeeded after retry"
51 );
52 }
53 return Ok(v);
54 }
55 Err(e) => {
56 tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
57 last_err = Some(e);
58 }
59 }
60 }
61
62 tracing::error!("orchestrator server lazy init exhausted retries");
63 match last_err {
65 Some(e) => Err(e),
66 None => anyhow::bail!("init_with_retry: unexpected empty error state"),
67 }
68}
69
70pub type ModelKey = (String, String);
72
73pub struct OrchestratorServer {
75 _managed: Option<ManagedServer>,
78 client: Client,
80 model_context_limits: HashMap<ModelKey, u64>,
82 base_url: String,
84 config: OrchestratorConfig,
86}
87
88impl OrchestratorServer {
89 #[allow(clippy::allow_attributes, dead_code)]
98 pub async fn start() -> anyhow::Result<Arc<Self>> {
99 Ok(Arc::new(Self::start_impl().await?))
100 }
101
102 pub async fn start_lazy() -> anyhow::Result<Self> {
112 if managed_guard_enabled() {
113 anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
114 }
115
116 init_with_retry(|_attempt| async { Self::start_impl().await }).await
117 }
118
119 async fn start_impl() -> anyhow::Result<Self> {
121 let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
122
123 let config = match agentic_config::loader::load_merged(&cwd) {
125 Ok(loaded) => {
126 for w in &loaded.warnings {
127 tracing::warn!("{w}");
128 }
129 loaded.config.orchestrator
130 }
131 Err(e) => {
132 tracing::warn!("Failed to load config, using defaults: {e}");
133 OrchestratorConfig::default()
134 }
135 };
136
137 let launcher_config = version::resolve_launcher_config(&cwd)
138 .context("Failed to resolve OpenCode launcher configuration")?;
139
140 tracing::info!(
141 binary = %launcher_config.binary,
142 launcher_args = ?launcher_config.launcher_args,
143 expected_version = %version::PINNED_OPENCODE_VERSION,
144 "starting embedded opencode serve (pinned stable)"
145 );
146
147 let opts = ServerOptions::default()
148 .binary(&launcher_config.binary)
149 .launcher_args(launcher_config.launcher_args)
150 .directory(cwd.clone());
151
152 let managed = ManagedServer::start(opts)
153 .await
154 .context("Failed to start embedded `opencode serve`")?;
155
156 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
158
159 let client = Client::builder()
160 .base_url(&base_url)
161 .directory(cwd.to_string_lossy().to_string())
162 .build()
163 .context("Failed to build opencode-rs HTTP client")?;
164
165 let health = client
166 .misc()
167 .health()
168 .await
169 .context("Failed to fetch /global/health for version validation")?;
170
171 version::validate_exact_version(health.version.as_deref()).with_context(|| {
172 format!(
173 "Embedded OpenCode server did not match pinned stable v{} (binary={})",
174 version::PINNED_OPENCODE_VERSION,
175 launcher_config.binary
176 )
177 })?;
178
179 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
181 tracing::warn!("Failed to load model limits: {}", e);
182 HashMap::new()
183 });
184
185 tracing::info!("Loaded {} model context limits", model_context_limits.len());
186
187 Ok(Self {
188 _managed: Some(managed),
189 client,
190 model_context_limits,
191 base_url,
192 config,
193 })
194 }
195
196 pub fn client(&self) -> &Client {
198 &self.client
199 }
200
201 #[allow(clippy::allow_attributes, dead_code)]
203 pub fn base_url(&self) -> &str {
204 &self.base_url
205 }
206
207 pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
209 self.model_context_limits
210 .get(&(provider_id.to_string(), model_id.to_string()))
211 .copied()
212 }
213
214 pub fn session_deadline(&self) -> Duration {
216 Duration::from_secs(self.config.session_deadline_secs)
217 }
218
219 pub fn inactivity_timeout(&self) -> Duration {
221 Duration::from_secs(self.config.inactivity_timeout_secs)
222 }
223
224 pub fn compaction_threshold(&self) -> f64 {
226 self.config.compaction_threshold
227 }
228
229 async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
231 let resp: ProviderListResponse = client.providers().list().await?;
232 let mut limits = HashMap::new();
233
234 for provider in resp.all {
235 for (model_id, model) in provider.models {
236 if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
237 limits.insert((provider.id.clone(), model_id), limit);
238 }
239 }
240 }
241
242 Ok(limits)
243 }
244
245 pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
247 let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
249
250 let text: String = assistant_msg
252 .parts
253 .iter()
254 .filter_map(|p| {
255 if let Part::Text { text, .. } = p {
256 Some(text.as_str())
257 } else {
258 None
259 }
260 })
261 .collect::<Vec<_>>()
262 .join("");
263
264 if text.trim().is_empty() {
265 None
266 } else {
267 Some(text)
268 }
269 }
270}
271
272#[cfg(feature = "test-support")]
278#[allow(dead_code, clippy::allow_attributes)]
279impl OrchestratorServer {
280 pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
285 Arc::new(Self::from_client_unshared(client, base_url))
286 }
287
288 pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
292 Self {
293 _managed: None,
294 client,
295 model_context_limits: HashMap::new(),
296 base_url: base_url.into().trim_end_matches('/').to_string(),
297 config: OrchestratorConfig::default(),
298 }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use serial_test::serial;
306 use std::sync::atomic::AtomicUsize;
307 use std::sync::atomic::Ordering;
308
309 #[tokio::test]
310 async fn init_with_retry_succeeds_on_first_attempt() {
311 let attempts = AtomicUsize::new(0);
312
313 let result: u32 = init_with_retry(|_| {
314 let n = attempts.fetch_add(1, Ordering::SeqCst);
315 async move {
316 assert_eq!(n, 0, "should only be called once on success");
318 Ok(42)
319 }
320 })
321 .await
322 .unwrap();
323
324 assert_eq!(result, 42);
325 assert_eq!(attempts.load(Ordering::SeqCst), 1);
326 }
327
328 #[tokio::test]
329 async fn init_with_retry_retries_once_and_succeeds() {
330 let attempts = AtomicUsize::new(0);
331
332 let result: u32 = init_with_retry(|_| {
333 let n = attempts.fetch_add(1, Ordering::SeqCst);
334 async move {
335 if n == 0 {
336 anyhow::bail!("fail first");
337 }
338 Ok(42)
339 }
340 })
341 .await
342 .unwrap();
343
344 assert_eq!(result, 42);
345 assert_eq!(attempts.load(Ordering::SeqCst), 2);
346 }
347
348 #[tokio::test]
349 async fn init_with_retry_fails_after_two_attempts() {
350 let attempts = AtomicUsize::new(0);
351
352 let err = init_with_retry::<(), _, _>(|_| {
353 attempts.fetch_add(1, Ordering::SeqCst);
354 async { anyhow::bail!("always fail") }
355 })
356 .await
357 .unwrap_err();
358
359 assert!(err.to_string().contains("always fail"));
360 assert_eq!(attempts.load(Ordering::SeqCst), 2);
361 }
362
363 #[test]
364 #[serial(env)]
365 fn managed_guard_disabled_when_env_not_set() {
366 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
369 assert!(!managed_guard_enabled());
370 }
371
372 #[test]
373 #[serial(env)]
374 fn managed_guard_enabled_when_env_is_1() {
375 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
377 assert!(managed_guard_enabled());
378 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
380 }
381
382 #[test]
383 #[serial(env)]
384 fn managed_guard_disabled_when_env_is_0() {
385 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
387 assert!(!managed_guard_enabled());
388 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
390 }
391
392 #[test]
393 #[serial(env)]
394 fn managed_guard_disabled_when_env_is_empty() {
395 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
397 assert!(!managed_guard_enabled());
398 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
400 }
401
402 #[test]
403 #[serial(env)]
404 fn managed_guard_disabled_when_env_is_whitespace() {
405 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, " ") };
407 assert!(!managed_guard_enabled());
408 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
410 }
411
412 #[test]
413 #[serial(env)]
414 fn managed_guard_enabled_when_env_is_truthy() {
415 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
417 assert!(managed_guard_enabled());
418 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
420 }
421}