Skip to main content

opencode_orchestrator_mcp/
server.rs

1//! Shared orchestrator server state.
2//!
3//! Wraps `ManagedServer` + `Client` + cached model context limits + config.
4
5use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17use crate::version;
18
19/// Environment variable name for the orchestrator-managed recursion guard.
20pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
21
22/// User-facing message returned when orchestrator tools are invoked in a nested context.
23pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
24     in a nested orchestration session. Consult a human for assistance or try to accomplish your \
25     task without the orchestration tools.";
26
27/// Check if the orchestrator-managed env var is set (guard enabled).
28pub fn managed_guard_enabled() -> bool {
29    match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
30        Ok(v) => v != "0" && !v.trim().is_empty(),
31        Err(_) => false,
32    }
33}
34
35/// Retry an async init operation once (2 total attempts) with tracing logs.
36pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
37where
38    F: FnMut(usize) -> Fut,
39    Fut: std::future::Future<Output = anyhow::Result<T>>,
40{
41    let mut last_err: Option<anyhow::Error> = None;
42
43    for attempt in 1..=2 {
44        tracing::info!(attempt, "orchestrator server lazy init attempt");
45        match f(attempt).await {
46            Ok(v) => {
47                if attempt > 1 {
48                    tracing::info!(
49                        attempt,
50                        "orchestrator server lazy init succeeded after retry"
51                    );
52                }
53                return Ok(v);
54            }
55            Err(e) => {
56                tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
57                last_err = Some(e);
58            }
59        }
60    }
61
62    tracing::error!("orchestrator server lazy init exhausted retries");
63    // Safety: The loop always runs at least once and sets last_err on failure
64    match last_err {
65        Some(e) => Err(e),
66        None => anyhow::bail!("init_with_retry: unexpected empty error state"),
67    }
68}
69
70/// Key for looking up model context limits: (`provider_id`, `model_id`)
71pub type ModelKey = (String, String);
72
73/// Shared state wrapping the managed `OpenCode` server and HTTP client.
74pub struct OrchestratorServer {
75    /// Keep alive for lifecycle; Drop kills the opencode serve process.
76    /// `None` when using an external client (e.g., wiremock tests).
77    _managed: Option<ManagedServer>,
78    /// HTTP client for `OpenCode` API
79    client: Client,
80    /// Cached model context limits from GET /provider
81    model_context_limits: HashMap<ModelKey, u64>,
82    /// Base URL of the managed server
83    base_url: String,
84    /// Orchestrator configuration (session timeouts, compaction threshold)
85    config: OrchestratorConfig,
86}
87
88impl OrchestratorServer {
89    /// Start a new managed `OpenCode` server and build the client.
90    ///
91    /// This is the eager initialization path that spawns the server immediately.
92    /// Prefer `start_lazy()` for deferred initialization.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the server fails to start or the client cannot be built.
97    #[allow(clippy::allow_attributes, dead_code)]
98    pub async fn start() -> anyhow::Result<Arc<Self>> {
99        Ok(Arc::new(Self::start_impl().await?))
100    }
101
102    /// Lazy initialization path for `OnceCell` usage.
103    ///
104    /// Checks the recursion guard env var first, then uses retry logic.
105    /// Returns `Self` (not `Arc<Self>`) for direct storage in `OnceCell`.
106    ///
107    /// # Errors
108    ///
109    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
110    /// Returns an error if the server fails to start after 2 attempts.
111    pub async fn start_lazy() -> anyhow::Result<Self> {
112        Self::start_lazy_with_config(None).await
113    }
114
115    /// Start the orchestrator server lazily with optional config injection.
116    ///
117    /// # Arguments
118    ///
119    /// * `config_json` - Optional JSON config to inject via `OPENCODE_CONFIG_CONTENT`
120    ///
121    /// # Errors
122    ///
123    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
124    /// Returns an error if the server fails to start after 2 attempts.
125    pub async fn start_lazy_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
126        if managed_guard_enabled() {
127            anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
128        }
129
130        init_with_retry(|_attempt| {
131            let cfg = config_json.clone();
132            async move { Self::start_impl_with_config(cfg).await }
133        })
134        .await
135    }
136
137    /// Internal implementation that actually spawns the server.
138    async fn start_impl() -> anyhow::Result<Self> {
139        let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
140
141        // Load configuration (best-effort, use defaults if unavailable)
142        let config = match agentic_config::loader::load_merged(&cwd) {
143            Ok(loaded) => {
144                for w in &loaded.warnings {
145                    tracing::warn!("{w}");
146                }
147                loaded.config.orchestrator
148            }
149            Err(e) => {
150                tracing::warn!("Failed to load config, using defaults: {e}");
151                OrchestratorConfig::default()
152            }
153        };
154
155        let launcher_config = version::resolve_launcher_config(&cwd)
156            .context("Failed to resolve OpenCode launcher configuration")?;
157
158        tracing::info!(
159            binary = %launcher_config.binary,
160            launcher_args = ?launcher_config.launcher_args,
161            expected_version = %version::PINNED_OPENCODE_VERSION,
162            "starting embedded opencode serve (pinned stable)"
163        );
164
165        let opts = ServerOptions::default()
166            .binary(&launcher_config.binary)
167            .launcher_args(launcher_config.launcher_args)
168            .directory(cwd.clone());
169
170        let managed = ManagedServer::start(opts)
171            .await
172            .context("Failed to start embedded `opencode serve`")?;
173
174        // Avoid trailing slash to prevent `//event` formatting
175        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
176
177        let client = Client::builder()
178            .base_url(&base_url)
179            .directory(cwd.to_string_lossy().to_string())
180            .build()
181            .context("Failed to build opencode-rs HTTP client")?;
182
183        let health = client
184            .misc()
185            .health()
186            .await
187            .context("Failed to fetch /global/health for version validation")?;
188
189        version::validate_exact_version(health.version.as_deref()).with_context(|| {
190            format!(
191                "Embedded OpenCode server did not match pinned stable v{} (binary={})",
192                version::PINNED_OPENCODE_VERSION,
193                launcher_config.binary
194            )
195        })?;
196
197        // Load model context limits (best-effort, don't fail if unavailable)
198        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
199            tracing::warn!("Failed to load model limits: {}", e);
200            HashMap::new()
201        });
202
203        tracing::info!("Loaded {} model context limits", model_context_limits.len());
204
205        Ok(Self {
206            _managed: Some(managed),
207            client,
208            model_context_limits,
209            base_url,
210            config,
211        })
212    }
213
214    /// Internal implementation with optional config injection.
215    async fn start_impl_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
216        let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
217
218        // Load configuration (best-effort, use defaults if unavailable)
219        let config = match agentic_config::loader::load_merged(&cwd) {
220            Ok(loaded) => {
221                for w in &loaded.warnings {
222                    tracing::warn!("{w}");
223                }
224                loaded.config.orchestrator
225            }
226            Err(e) => {
227                tracing::warn!("Failed to load config, using defaults: {e}");
228                OrchestratorConfig::default()
229            }
230        };
231
232        let launcher_config = version::resolve_launcher_config(&cwd)
233            .context("Failed to resolve OpenCode launcher configuration")?;
234
235        tracing::info!(
236            binary = %launcher_config.binary,
237            launcher_args = ?launcher_config.launcher_args,
238            expected_version = %version::PINNED_OPENCODE_VERSION,
239            config_injected = config_json.is_some(),
240            "starting embedded opencode serve (pinned stable)"
241        );
242
243        let mut opts = ServerOptions::default()
244            .binary(&launcher_config.binary)
245            .launcher_args(launcher_config.launcher_args)
246            .directory(cwd.clone());
247
248        // Inject config if provided
249        if let Some(cfg) = config_json {
250            opts = opts.config_json(cfg);
251        }
252
253        let managed = ManagedServer::start(opts)
254            .await
255            .context("Failed to start embedded `opencode serve`")?;
256
257        // Avoid trailing slash to prevent `//event` formatting
258        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
259
260        let client = Client::builder()
261            .base_url(&base_url)
262            .directory(cwd.to_string_lossy().to_string())
263            .build()
264            .context("Failed to build opencode-rs HTTP client")?;
265
266        let health = client
267            .misc()
268            .health()
269            .await
270            .context("Failed to fetch /global/health for version validation")?;
271
272        version::validate_exact_version(health.version.as_deref()).with_context(|| {
273            format!(
274                "Embedded OpenCode server did not match pinned stable v{} (binary={})",
275                version::PINNED_OPENCODE_VERSION,
276                launcher_config.binary
277            )
278        })?;
279
280        // Load model context limits (best-effort, don't fail if unavailable)
281        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
282            tracing::warn!("Failed to load model limits: {}", e);
283            HashMap::new()
284        });
285
286        tracing::info!("Loaded {} model context limits", model_context_limits.len());
287
288        Ok(Self {
289            _managed: Some(managed),
290            client,
291            model_context_limits,
292            base_url,
293            config,
294        })
295    }
296
297    /// Get the HTTP client.
298    pub fn client(&self) -> &Client {
299        &self.client
300    }
301
302    /// Get the base URL of the managed server.
303    #[allow(clippy::allow_attributes, dead_code)]
304    pub fn base_url(&self) -> &str {
305        &self.base_url
306    }
307
308    /// Look up context limit for a specific model.
309    pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
310        self.model_context_limits
311            .get(&(provider_id.to_string(), model_id.to_string()))
312            .copied()
313    }
314
315    /// Get the session deadline duration.
316    pub fn session_deadline(&self) -> Duration {
317        Duration::from_secs(self.config.session_deadline_secs)
318    }
319
320    /// Get the inactivity timeout duration.
321    pub fn inactivity_timeout(&self) -> Duration {
322        Duration::from_secs(self.config.inactivity_timeout_secs)
323    }
324
325    /// Get the compaction threshold (0.0 - 1.0).
326    pub fn compaction_threshold(&self) -> f64 {
327        self.config.compaction_threshold
328    }
329
330    /// Load model context limits from GET /provider.
331    async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
332        let resp: ProviderListResponse = client.providers().list().await?;
333        let mut limits = HashMap::new();
334
335        for provider in resp.all {
336            for (model_id, model) in provider.models {
337                if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
338                    limits.insert((provider.id.clone(), model_id), limit);
339                }
340            }
341        }
342
343        Ok(limits)
344    }
345
346    /// Extract text content from the last assistant message.
347    pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
348        // Find the last assistant message
349        let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
350
351        // Join all text parts
352        let text: String = assistant_msg
353            .parts
354            .iter()
355            .filter_map(|p| {
356                if let Part::Text { text, .. } = p {
357                    Some(text.as_str())
358                } else {
359                    None
360                }
361            })
362            .collect::<Vec<_>>()
363            .join("");
364
365        if text.trim().is_empty() {
366            None
367        } else {
368            Some(text)
369        }
370    }
371}
372
373/// Test support utilities (requires `test-support` feature).
374///
375/// These functions may appear unused when compiling non-test targets because
376/// cargo's feature unification enables the feature for all targets when tests
377/// are compiled. The `dead_code` warning is expected and suppressed.
378#[cfg(feature = "test-support")]
379#[allow(dead_code, clippy::allow_attributes)]
380impl OrchestratorServer {
381    /// Build an `OrchestratorServer` wrapper around an existing client.
382    ///
383    /// Does NOT manage an opencode process (intended for wiremock tests).
384    /// Model context limits are not loaded and will return `None` for all lookups.
385    pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
386        Arc::new(Self::from_client_unshared(client, base_url))
387    }
388
389    /// Build an `OrchestratorServer` wrapper returning `Self` (not `Arc<Self>`).
390    ///
391    /// Useful for tests that need to populate an `OnceCell` directly.
392    pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
393        Self {
394            _managed: None,
395            client,
396            model_context_limits: HashMap::new(),
397            base_url: base_url.into().trim_end_matches('/').to_string(),
398            config: OrchestratorConfig::default(),
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use serial_test::serial;
407    use std::sync::atomic::AtomicUsize;
408    use std::sync::atomic::Ordering;
409
410    #[tokio::test]
411    async fn init_with_retry_succeeds_on_first_attempt() {
412        let attempts = AtomicUsize::new(0);
413
414        let result: u32 = init_with_retry(|_| {
415            let n = attempts.fetch_add(1, Ordering::SeqCst);
416            async move {
417                // Always succeed
418                assert_eq!(n, 0, "should only be called once on success");
419                Ok(42)
420            }
421        })
422        .await
423        .unwrap();
424
425        assert_eq!(result, 42);
426        assert_eq!(attempts.load(Ordering::SeqCst), 1);
427    }
428
429    #[tokio::test]
430    async fn init_with_retry_retries_once_and_succeeds() {
431        let attempts = AtomicUsize::new(0);
432
433        let result: u32 = init_with_retry(|_| {
434            let n = attempts.fetch_add(1, Ordering::SeqCst);
435            async move {
436                if n == 0 {
437                    anyhow::bail!("fail first");
438                }
439                Ok(42)
440            }
441        })
442        .await
443        .unwrap();
444
445        assert_eq!(result, 42);
446        assert_eq!(attempts.load(Ordering::SeqCst), 2);
447    }
448
449    #[tokio::test]
450    async fn init_with_retry_fails_after_two_attempts() {
451        let attempts = AtomicUsize::new(0);
452
453        let err = init_with_retry::<(), _, _>(|_| {
454            attempts.fetch_add(1, Ordering::SeqCst);
455            async { anyhow::bail!("always fail") }
456        })
457        .await
458        .unwrap_err();
459
460        assert!(err.to_string().contains("always fail"));
461        assert_eq!(attempts.load(Ordering::SeqCst), 2);
462    }
463
464    #[test]
465    #[serial(env)]
466    fn managed_guard_disabled_when_env_not_set() {
467        // Ensure the env var is not set
468        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
469        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
470        assert!(!managed_guard_enabled());
471    }
472
473    #[test]
474    #[serial(env)]
475    fn managed_guard_enabled_when_env_is_1() {
476        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
477        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
478        assert!(managed_guard_enabled());
479        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
480        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
481    }
482
483    #[test]
484    #[serial(env)]
485    fn managed_guard_disabled_when_env_is_0() {
486        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
487        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
488        assert!(!managed_guard_enabled());
489        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
490        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
491    }
492
493    #[test]
494    #[serial(env)]
495    fn managed_guard_disabled_when_env_is_empty() {
496        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
497        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
498        assert!(!managed_guard_enabled());
499        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
500        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
501    }
502
503    #[test]
504    #[serial(env)]
505    fn managed_guard_disabled_when_env_is_whitespace() {
506        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
507        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "   ") };
508        assert!(!managed_guard_enabled());
509        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
510        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
511    }
512
513    #[test]
514    #[serial(env)]
515    fn managed_guard_enabled_when_env_is_truthy() {
516        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
517        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
518        assert!(managed_guard_enabled());
519        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
520        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
521    }
522}