Skip to main content

opencode_orchestrator_mcp/
server.rs

1//! Shared orchestrator server state.
2//!
3//! Wraps `ManagedServer` + `Client` + cached model context limits + config.
4
5use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17use crate::version;
18
19/// Environment variable name for the orchestrator-managed recursion guard.
20pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
21
22/// User-facing message returned when orchestrator tools are invoked in a nested context.
23pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
24     in a nested orchestration session. Consult a human for assistance or try to accomplish your \
25     task without the orchestration tools.";
26
27/// Check if the orchestrator-managed env var is set (guard enabled).
28pub fn managed_guard_enabled() -> bool {
29    match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
30        Ok(v) => v != "0" && !v.trim().is_empty(),
31        Err(_) => false,
32    }
33}
34
35/// Retry an async init operation once (2 total attempts) with tracing logs.
36pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
37where
38    F: FnMut(usize) -> Fut,
39    Fut: std::future::Future<Output = anyhow::Result<T>>,
40{
41    let mut last_err: Option<anyhow::Error> = None;
42
43    for attempt in 1..=2 {
44        tracing::info!(attempt, "orchestrator server lazy init attempt");
45        match f(attempt).await {
46            Ok(v) => {
47                if attempt > 1 {
48                    tracing::info!(
49                        attempt,
50                        "orchestrator server lazy init succeeded after retry"
51                    );
52                }
53                return Ok(v);
54            }
55            Err(e) => {
56                tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
57                last_err = Some(e);
58            }
59        }
60    }
61
62    tracing::error!("orchestrator server lazy init exhausted retries");
63    // Safety: The loop always runs at least once and sets last_err on failure
64    match last_err {
65        Some(e) => Err(e),
66        None => anyhow::bail!("init_with_retry: unexpected empty error state"),
67    }
68}
69
70/// Key for looking up model context limits: (`provider_id`, `model_id`)
71pub type ModelKey = (String, String);
72
73/// Shared state wrapping the managed `OpenCode` server and HTTP client.
74pub struct OrchestratorServer {
75    /// Keep alive for lifecycle; Drop kills the opencode serve process.
76    /// `None` when using an external client (e.g., wiremock tests).
77    _managed: Option<ManagedServer>,
78    /// HTTP client for `OpenCode` API
79    client: Client,
80    /// Cached model context limits from GET /provider
81    model_context_limits: HashMap<ModelKey, u64>,
82    /// Base URL of the managed server
83    base_url: String,
84    /// Orchestrator configuration (session timeouts, compaction threshold)
85    config: OrchestratorConfig,
86}
87
88impl OrchestratorServer {
89    /// Start a new managed `OpenCode` server and build the client.
90    ///
91    /// This is the eager initialization path that spawns the server immediately.
92    /// Prefer `start_lazy()` for deferred initialization.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the server fails to start or the client cannot be built.
97    #[allow(clippy::allow_attributes, dead_code)]
98    pub async fn start() -> anyhow::Result<Arc<Self>> {
99        Ok(Arc::new(Self::start_impl().await?))
100    }
101
102    /// Lazy initialization path for `OnceCell` usage.
103    ///
104    /// Checks the recursion guard env var first, then uses retry logic.
105    /// Returns `Self` (not `Arc<Self>`) for direct storage in `OnceCell`.
106    ///
107    /// # Errors
108    ///
109    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
110    /// Returns an error if the server fails to start after 2 attempts.
111    pub async fn start_lazy() -> anyhow::Result<Self> {
112        if managed_guard_enabled() {
113            anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
114        }
115
116        init_with_retry(|_attempt| async { Self::start_impl().await }).await
117    }
118
119    /// Internal implementation that actually spawns the server.
120    async fn start_impl() -> anyhow::Result<Self> {
121        let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
122
123        // Load configuration (best-effort, use defaults if unavailable)
124        let config = match agentic_config::loader::load_merged(&cwd) {
125            Ok(loaded) => {
126                for w in &loaded.warnings {
127                    tracing::warn!("{w}");
128                }
129                loaded.config.orchestrator
130            }
131            Err(e) => {
132                tracing::warn!("Failed to load config, using defaults: {e}");
133                OrchestratorConfig::default()
134            }
135        };
136
137        let launcher_config = version::resolve_launcher_config(&cwd)
138            .context("Failed to resolve OpenCode launcher configuration")?;
139
140        tracing::info!(
141            binary = %launcher_config.binary,
142            launcher_args = ?launcher_config.launcher_args,
143            expected_version = %version::PINNED_OPENCODE_VERSION,
144            "starting embedded opencode serve (pinned stable)"
145        );
146
147        let opts = ServerOptions::default()
148            .binary(&launcher_config.binary)
149            .launcher_args(launcher_config.launcher_args)
150            .directory(cwd.clone());
151
152        let managed = ManagedServer::start(opts)
153            .await
154            .context("Failed to start embedded `opencode serve`")?;
155
156        // Avoid trailing slash to prevent `//event` formatting
157        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
158
159        let client = Client::builder()
160            .base_url(&base_url)
161            .directory(cwd.to_string_lossy().to_string())
162            .build()
163            .context("Failed to build opencode-rs HTTP client")?;
164
165        let health = client
166            .misc()
167            .health()
168            .await
169            .context("Failed to fetch /global/health for version validation")?;
170
171        version::validate_exact_version(health.version.as_deref()).with_context(|| {
172            format!(
173                "Embedded OpenCode server did not match pinned stable v{} (binary={})",
174                version::PINNED_OPENCODE_VERSION,
175                launcher_config.binary
176            )
177        })?;
178
179        // Load model context limits (best-effort, don't fail if unavailable)
180        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
181            tracing::warn!("Failed to load model limits: {}", e);
182            HashMap::new()
183        });
184
185        tracing::info!("Loaded {} model context limits", model_context_limits.len());
186
187        Ok(Self {
188            _managed: Some(managed),
189            client,
190            model_context_limits,
191            base_url,
192            config,
193        })
194    }
195
196    /// Get the HTTP client.
197    pub fn client(&self) -> &Client {
198        &self.client
199    }
200
201    /// Get the base URL of the managed server.
202    #[allow(clippy::allow_attributes, dead_code)]
203    pub fn base_url(&self) -> &str {
204        &self.base_url
205    }
206
207    /// Look up context limit for a specific model.
208    pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
209        self.model_context_limits
210            .get(&(provider_id.to_string(), model_id.to_string()))
211            .copied()
212    }
213
214    /// Get the session deadline duration.
215    pub fn session_deadline(&self) -> Duration {
216        Duration::from_secs(self.config.session_deadline_secs)
217    }
218
219    /// Get the inactivity timeout duration.
220    pub fn inactivity_timeout(&self) -> Duration {
221        Duration::from_secs(self.config.inactivity_timeout_secs)
222    }
223
224    /// Get the compaction threshold (0.0 - 1.0).
225    pub fn compaction_threshold(&self) -> f64 {
226        self.config.compaction_threshold
227    }
228
229    /// Load model context limits from GET /provider.
230    async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
231        let resp: ProviderListResponse = client.providers().list().await?;
232        let mut limits = HashMap::new();
233
234        for provider in resp.all {
235            for (model_id, model) in provider.models {
236                if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
237                    limits.insert((provider.id.clone(), model_id), limit);
238                }
239            }
240        }
241
242        Ok(limits)
243    }
244
245    /// Extract text content from the last assistant message.
246    pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
247        // Find the last assistant message
248        let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
249
250        // Join all text parts
251        let text: String = assistant_msg
252            .parts
253            .iter()
254            .filter_map(|p| {
255                if let Part::Text { text, .. } = p {
256                    Some(text.as_str())
257                } else {
258                    None
259                }
260            })
261            .collect::<Vec<_>>()
262            .join("");
263
264        if text.trim().is_empty() {
265            None
266        } else {
267            Some(text)
268        }
269    }
270}
271
272/// Test support utilities (requires `test-support` feature).
273///
274/// These functions may appear unused when compiling non-test targets because
275/// cargo's feature unification enables the feature for all targets when tests
276/// are compiled. The `dead_code` warning is expected and suppressed.
277#[cfg(feature = "test-support")]
278#[allow(dead_code, clippy::allow_attributes)]
279impl OrchestratorServer {
280    /// Build an `OrchestratorServer` wrapper around an existing client.
281    ///
282    /// Does NOT manage an opencode process (intended for wiremock tests).
283    /// Model context limits are not loaded and will return `None` for all lookups.
284    pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
285        Arc::new(Self::from_client_unshared(client, base_url))
286    }
287
288    /// Build an `OrchestratorServer` wrapper returning `Self` (not `Arc<Self>`).
289    ///
290    /// Useful for tests that need to populate an `OnceCell` directly.
291    pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
292        Self {
293            _managed: None,
294            client,
295            model_context_limits: HashMap::new(),
296            base_url: base_url.into().trim_end_matches('/').to_string(),
297            config: OrchestratorConfig::default(),
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use serial_test::serial;
306    use std::sync::atomic::AtomicUsize;
307    use std::sync::atomic::Ordering;
308
309    #[tokio::test]
310    async fn init_with_retry_succeeds_on_first_attempt() {
311        let attempts = AtomicUsize::new(0);
312
313        let result: u32 = init_with_retry(|_| {
314            let n = attempts.fetch_add(1, Ordering::SeqCst);
315            async move {
316                // Always succeed
317                assert_eq!(n, 0, "should only be called once on success");
318                Ok(42)
319            }
320        })
321        .await
322        .unwrap();
323
324        assert_eq!(result, 42);
325        assert_eq!(attempts.load(Ordering::SeqCst), 1);
326    }
327
328    #[tokio::test]
329    async fn init_with_retry_retries_once_and_succeeds() {
330        let attempts = AtomicUsize::new(0);
331
332        let result: u32 = init_with_retry(|_| {
333            let n = attempts.fetch_add(1, Ordering::SeqCst);
334            async move {
335                if n == 0 {
336                    anyhow::bail!("fail first");
337                }
338                Ok(42)
339            }
340        })
341        .await
342        .unwrap();
343
344        assert_eq!(result, 42);
345        assert_eq!(attempts.load(Ordering::SeqCst), 2);
346    }
347
348    #[tokio::test]
349    async fn init_with_retry_fails_after_two_attempts() {
350        let attempts = AtomicUsize::new(0);
351
352        let err = init_with_retry::<(), _, _>(|_| {
353            attempts.fetch_add(1, Ordering::SeqCst);
354            async { anyhow::bail!("always fail") }
355        })
356        .await
357        .unwrap_err();
358
359        assert!(err.to_string().contains("always fail"));
360        assert_eq!(attempts.load(Ordering::SeqCst), 2);
361    }
362
363    #[test]
364    #[serial(env)]
365    fn managed_guard_disabled_when_env_not_set() {
366        // Ensure the env var is not set
367        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
368        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
369        assert!(!managed_guard_enabled());
370    }
371
372    #[test]
373    #[serial(env)]
374    fn managed_guard_enabled_when_env_is_1() {
375        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
376        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
377        assert!(managed_guard_enabled());
378        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
379        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
380    }
381
382    #[test]
383    #[serial(env)]
384    fn managed_guard_disabled_when_env_is_0() {
385        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
386        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
387        assert!(!managed_guard_enabled());
388        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
389        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
390    }
391
392    #[test]
393    #[serial(env)]
394    fn managed_guard_disabled_when_env_is_empty() {
395        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
396        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
397        assert!(!managed_guard_enabled());
398        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
399        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
400    }
401
402    #[test]
403    #[serial(env)]
404    fn managed_guard_disabled_when_env_is_whitespace() {
405        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
406        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "   ") };
407        assert!(!managed_guard_enabled());
408        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
409        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
410    }
411
412    #[test]
413    #[serial(env)]
414    fn managed_guard_enabled_when_env_is_truthy() {
415        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
416        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
417        assert!(managed_guard_enabled());
418        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
419        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
420    }
421}