opencode_orchestrator_mcp/
server.rs1use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
19
20pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
22 in a nested orchestration session. Consult a human for assistance or try to accomplish your \
23 task without the orchestration tools.";
24
25pub fn managed_guard_enabled() -> bool {
27 match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
28 Ok(v) => v != "0" && !v.trim().is_empty(),
29 Err(_) => false,
30 }
31}
32
33pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
35where
36 F: FnMut(usize) -> Fut,
37 Fut: std::future::Future<Output = anyhow::Result<T>>,
38{
39 let mut last_err: Option<anyhow::Error> = None;
40
41 for attempt in 1..=2 {
42 tracing::info!(attempt, "orchestrator server lazy init attempt");
43 match f(attempt).await {
44 Ok(v) => {
45 if attempt > 1 {
46 tracing::info!(
47 attempt,
48 "orchestrator server lazy init succeeded after retry"
49 );
50 }
51 return Ok(v);
52 }
53 Err(e) => {
54 tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
55 last_err = Some(e);
56 }
57 }
58 }
59
60 tracing::error!("orchestrator server lazy init exhausted retries");
61 match last_err {
63 Some(e) => Err(e),
64 None => anyhow::bail!("init_with_retry: unexpected empty error state"),
65 }
66}
67
68pub type ModelKey = (String, String);
70
71pub struct OrchestratorServer {
73 _managed: Option<ManagedServer>,
76 client: Client,
78 model_context_limits: HashMap<ModelKey, u64>,
80 base_url: String,
82 config: OrchestratorConfig,
84}
85
86impl OrchestratorServer {
87 #[allow(clippy::allow_attributes, dead_code)]
96 pub async fn start() -> anyhow::Result<Arc<Self>> {
97 Ok(Arc::new(Self::start_impl().await?))
98 }
99
100 pub async fn start_lazy() -> anyhow::Result<Self> {
110 if managed_guard_enabled() {
111 anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
112 }
113
114 init_with_retry(|_attempt| async { Self::start_impl().await }).await
115 }
116
117 async fn start_impl() -> anyhow::Result<Self> {
119 let cwd = std::env::current_dir().unwrap_or_default();
121 let config = match agentic_config::loader::load_merged(&cwd) {
122 Ok(loaded) => {
123 for w in &loaded.warnings {
124 tracing::warn!("{w}");
125 }
126 loaded.config.orchestrator
127 }
128 Err(e) => {
129 tracing::warn!("Failed to load config, using defaults: {e}");
130 OrchestratorConfig::default()
131 }
132 };
133
134 let managed = ManagedServer::start(ServerOptions::default())
135 .await
136 .context("Failed to start embedded `opencode serve`")?;
137
138 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
140
141 let directory = std::env::current_dir()
142 .ok()
143 .map(|p| p.to_string_lossy().to_string());
144
145 let mut builder = Client::builder().base_url(&base_url);
146 if let Some(dir) = &directory {
147 builder = builder.directory(dir.clone());
148 }
149
150 let client = builder
151 .build()
152 .context("Failed to build opencode-rs HTTP client")?;
153
154 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
156 tracing::warn!("Failed to load model limits: {}", e);
157 HashMap::new()
158 });
159
160 tracing::info!("Loaded {} model context limits", model_context_limits.len());
161
162 Ok(Self {
163 _managed: Some(managed),
164 client,
165 model_context_limits,
166 base_url,
167 config,
168 })
169 }
170
171 pub fn client(&self) -> &Client {
173 &self.client
174 }
175
176 #[allow(clippy::allow_attributes, dead_code)]
178 pub fn base_url(&self) -> &str {
179 &self.base_url
180 }
181
182 pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
184 self.model_context_limits
185 .get(&(provider_id.to_string(), model_id.to_string()))
186 .copied()
187 }
188
189 pub fn session_deadline(&self) -> Duration {
191 Duration::from_secs(self.config.session_deadline_secs)
192 }
193
194 pub fn inactivity_timeout(&self) -> Duration {
196 Duration::from_secs(self.config.inactivity_timeout_secs)
197 }
198
199 pub fn compaction_threshold(&self) -> f64 {
201 self.config.compaction_threshold
202 }
203
204 async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
206 let resp: ProviderListResponse = client.providers().list().await?;
207 let mut limits = HashMap::new();
208
209 for provider in resp.all {
210 for (model_id, model) in provider.models {
211 if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
212 limits.insert((provider.id.clone(), model_id), limit);
213 }
214 }
215 }
216
217 Ok(limits)
218 }
219
220 pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
222 let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
224
225 let text: String = assistant_msg
227 .parts
228 .iter()
229 .filter_map(|p| {
230 if let Part::Text { text, .. } = p {
231 Some(text.as_str())
232 } else {
233 None
234 }
235 })
236 .collect::<Vec<_>>()
237 .join("");
238
239 if text.trim().is_empty() {
240 None
241 } else {
242 Some(text)
243 }
244 }
245}
246
247#[cfg(feature = "test-support")]
253#[allow(dead_code, clippy::allow_attributes)]
254impl OrchestratorServer {
255 pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
260 Arc::new(Self::from_client_unshared(client, base_url))
261 }
262
263 pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
267 Self {
268 _managed: None,
269 client,
270 model_context_limits: HashMap::new(),
271 base_url: base_url.into().trim_end_matches('/').to_string(),
272 config: OrchestratorConfig::default(),
273 }
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use std::sync::Mutex;
281 use std::sync::OnceLock;
282 use std::sync::atomic::AtomicUsize;
283 use std::sync::atomic::Ordering;
284
285 static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
287
288 #[tokio::test]
289 async fn init_with_retry_succeeds_on_first_attempt() {
290 let attempts = AtomicUsize::new(0);
291
292 let result: u32 = init_with_retry(|_| {
293 let n = attempts.fetch_add(1, Ordering::SeqCst);
294 async move {
295 assert_eq!(n, 0, "should only be called once on success");
297 Ok(42)
298 }
299 })
300 .await
301 .unwrap();
302
303 assert_eq!(result, 42);
304 assert_eq!(attempts.load(Ordering::SeqCst), 1);
305 }
306
307 #[tokio::test]
308 async fn init_with_retry_retries_once_and_succeeds() {
309 let attempts = AtomicUsize::new(0);
310
311 let result: u32 = init_with_retry(|_| {
312 let n = attempts.fetch_add(1, Ordering::SeqCst);
313 async move {
314 if n == 0 {
315 anyhow::bail!("fail first");
316 }
317 Ok(42)
318 }
319 })
320 .await
321 .unwrap();
322
323 assert_eq!(result, 42);
324 assert_eq!(attempts.load(Ordering::SeqCst), 2);
325 }
326
327 #[tokio::test]
328 async fn init_with_retry_fails_after_two_attempts() {
329 let attempts = AtomicUsize::new(0);
330
331 let err = init_with_retry::<(), _, _>(|_| {
332 attempts.fetch_add(1, Ordering::SeqCst);
333 async { anyhow::bail!("always fail") }
334 })
335 .await
336 .unwrap_err();
337
338 assert!(err.to_string().contains("always fail"));
339 assert_eq!(attempts.load(Ordering::SeqCst), 2);
340 }
341
342 #[test]
343 fn managed_guard_disabled_when_env_not_set() {
344 let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
345
346 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
349 assert!(!managed_guard_enabled());
350 }
351
352 #[test]
353 fn managed_guard_enabled_when_env_is_1() {
354 let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
355
356 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
358 assert!(managed_guard_enabled());
359 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
361 }
362
363 #[test]
364 fn managed_guard_disabled_when_env_is_0() {
365 let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
366
367 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
369 assert!(!managed_guard_enabled());
370 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
372 }
373
374 #[test]
375 fn managed_guard_disabled_when_env_is_empty() {
376 let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
377
378 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
380 assert!(!managed_guard_enabled());
381 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
383 }
384
385 #[test]
386 fn managed_guard_disabled_when_env_is_whitespace() {
387 let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
388
389 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, " ") };
391 assert!(!managed_guard_enabled());
392 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
394 }
395
396 #[test]
397 fn managed_guard_enabled_when_env_is_truthy() {
398 let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
399
400 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
402 assert!(managed_guard_enabled());
403 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
405 }
406}