Skip to main content

opencode_orchestrator_mcp/
server.rs

1//! Shared orchestrator server state.
2//!
3//! Wraps `ManagedServer` + `Client` + cached model context limits + config.
4
5use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::collections::HashSet;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::RwLock;
18
19use crate::version;
20
21/// Environment variable name for the orchestrator-managed recursion guard.
22pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
23
24/// User-facing message returned when orchestrator tools are invoked in a nested context.
25pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
26     in a nested orchestration session. Consult a human for assistance or try to accomplish your \
27     task without the orchestration tools.";
28
29/// Check if the orchestrator-managed env var is set (guard enabled).
30pub fn managed_guard_enabled() -> bool {
31    match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
32        Ok(v) => v != "0" && !v.trim().is_empty(),
33        Err(_) => false,
34    }
35}
36
37/// Retry an async init operation once (2 total attempts) with tracing logs.
38pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
39where
40    F: FnMut(usize) -> Fut,
41    Fut: std::future::Future<Output = anyhow::Result<T>>,
42{
43    let mut last_err: Option<anyhow::Error> = None;
44
45    for attempt in 1..=2 {
46        tracing::info!(attempt, "orchestrator server lazy init attempt");
47        match f(attempt).await {
48            Ok(v) => {
49                if attempt > 1 {
50                    tracing::info!(
51                        attempt,
52                        "orchestrator server lazy init succeeded after retry"
53                    );
54                }
55                return Ok(v);
56            }
57            Err(e) => {
58                tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
59                last_err = Some(e);
60            }
61        }
62    }
63
64    tracing::error!("orchestrator server lazy init exhausted retries");
65    // Safety: The loop always runs at least once and sets last_err on failure
66    match last_err {
67        Some(e) => Err(e),
68        None => anyhow::bail!("init_with_retry: unexpected empty error state"),
69    }
70}
71
72/// Key for looking up model context limits: (`provider_id`, `model_id`)
73pub type ModelKey = (String, String);
74
75/// Shared state wrapping the managed `OpenCode` server and HTTP client.
76pub struct OrchestratorServer {
77    /// Keep alive for lifecycle; Drop kills the opencode serve process.
78    /// `None` when using an external client (e.g., wiremock tests).
79    _managed: Option<ManagedServer>,
80    /// HTTP client for `OpenCode` API
81    client: Client,
82    /// Cached model context limits from GET /provider
83    model_context_limits: HashMap<ModelKey, u64>,
84    /// Base URL of the managed server
85    base_url: String,
86    /// Orchestrator configuration (session timeouts, compaction threshold)
87    config: OrchestratorConfig,
88    /// Session IDs created by this orchestrator instance.
89    spawned_sessions: Arc<RwLock<HashSet<String>>>,
90}
91
92impl OrchestratorServer {
93    /// Start a new managed `OpenCode` server and build the client.
94    ///
95    /// This is the eager initialization path that spawns the server immediately.
96    /// Prefer `start_lazy()` for deferred initialization.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the server fails to start or the client cannot be built.
101    #[allow(clippy::allow_attributes, dead_code)]
102    pub async fn start() -> anyhow::Result<Arc<Self>> {
103        Ok(Arc::new(Self::start_impl().await?))
104    }
105
106    /// Lazy initialization path for `OnceCell` usage.
107    ///
108    /// Checks the recursion guard env var first, then uses retry logic.
109    /// Returns `Self` (not `Arc<Self>`) for direct storage in `OnceCell`.
110    ///
111    /// # Errors
112    ///
113    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
114    /// Returns an error if the server fails to start after 2 attempts.
115    pub async fn start_lazy() -> anyhow::Result<Self> {
116        Self::start_lazy_with_config(None).await
117    }
118
119    /// Start the orchestrator server lazily with optional config injection.
120    ///
121    /// # Arguments
122    ///
123    /// * `config_json` - Optional JSON config to inject via `OPENCODE_CONFIG_CONTENT`
124    ///
125    /// # Errors
126    ///
127    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
128    /// Returns an error if the server fails to start after 2 attempts.
129    pub async fn start_lazy_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
130        if managed_guard_enabled() {
131            anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
132        }
133
134        init_with_retry(|_attempt| {
135            let cfg = config_json.clone();
136            async move { Self::start_impl_with_config(cfg).await }
137        })
138        .await
139    }
140
141    /// Internal implementation that actually spawns the server.
142    async fn start_impl() -> anyhow::Result<Self> {
143        let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
144
145        // Load configuration (best-effort, use defaults if unavailable)
146        let config = match agentic_config::loader::load_merged(&cwd) {
147            Ok(loaded) => {
148                for w in &loaded.warnings {
149                    tracing::warn!("{w}");
150                }
151                loaded.config.orchestrator
152            }
153            Err(e) => {
154                tracing::warn!("Failed to load config, using defaults: {e}");
155                OrchestratorConfig::default()
156            }
157        };
158
159        let launcher_config = version::resolve_launcher_config(&cwd)
160            .context("Failed to resolve OpenCode launcher configuration")?;
161
162        tracing::info!(
163            binary = %launcher_config.binary,
164            launcher_args = ?launcher_config.launcher_args,
165            expected_version = %version::PINNED_OPENCODE_VERSION,
166            "starting embedded opencode serve (pinned stable)"
167        );
168
169        let opts = ServerOptions::default()
170            .binary(&launcher_config.binary)
171            .launcher_args(launcher_config.launcher_args)
172            .directory(cwd.clone());
173
174        let managed = ManagedServer::start(opts)
175            .await
176            .context("Failed to start embedded `opencode serve`")?;
177
178        // Avoid trailing slash to prevent `//event` formatting
179        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
180
181        let client = Client::builder()
182            .base_url(&base_url)
183            .directory(cwd.to_string_lossy().to_string())
184            .build()
185            .context("Failed to build opencode-rs HTTP client")?;
186
187        let health = client
188            .misc()
189            .health()
190            .await
191            .context("Failed to fetch /global/health for version validation")?;
192
193        version::validate_exact_version(health.version.as_deref()).with_context(|| {
194            format!(
195                "Embedded OpenCode server did not match pinned stable v{} (binary={})",
196                version::PINNED_OPENCODE_VERSION,
197                launcher_config.binary
198            )
199        })?;
200
201        // Load model context limits (best-effort, don't fail if unavailable)
202        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
203            tracing::warn!("Failed to load model limits: {}", e);
204            HashMap::new()
205        });
206
207        tracing::info!("Loaded {} model context limits", model_context_limits.len());
208
209        Ok(Self {
210            _managed: Some(managed),
211            client,
212            model_context_limits,
213            base_url,
214            config,
215            spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
216        })
217    }
218
219    /// Internal implementation with optional config injection.
220    async fn start_impl_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
221        let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
222
223        // Load configuration (best-effort, use defaults if unavailable)
224        let config = match agentic_config::loader::load_merged(&cwd) {
225            Ok(loaded) => {
226                for w in &loaded.warnings {
227                    tracing::warn!("{w}");
228                }
229                loaded.config.orchestrator
230            }
231            Err(e) => {
232                tracing::warn!("Failed to load config, using defaults: {e}");
233                OrchestratorConfig::default()
234            }
235        };
236
237        let launcher_config = version::resolve_launcher_config(&cwd)
238            .context("Failed to resolve OpenCode launcher configuration")?;
239
240        tracing::info!(
241            binary = %launcher_config.binary,
242            launcher_args = ?launcher_config.launcher_args,
243            expected_version = %version::PINNED_OPENCODE_VERSION,
244            config_injected = config_json.is_some(),
245            "starting embedded opencode serve (pinned stable)"
246        );
247
248        let mut opts = ServerOptions::default()
249            .binary(&launcher_config.binary)
250            .launcher_args(launcher_config.launcher_args)
251            .directory(cwd.clone());
252
253        // Inject config if provided
254        if let Some(cfg) = config_json {
255            opts = opts.config_json(cfg);
256        }
257
258        let managed = ManagedServer::start(opts)
259            .await
260            .context("Failed to start embedded `opencode serve`")?;
261
262        // Avoid trailing slash to prevent `//event` formatting
263        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
264
265        let client = Client::builder()
266            .base_url(&base_url)
267            .directory(cwd.to_string_lossy().to_string())
268            .build()
269            .context("Failed to build opencode-rs HTTP client")?;
270
271        let health = client
272            .misc()
273            .health()
274            .await
275            .context("Failed to fetch /global/health for version validation")?;
276
277        version::validate_exact_version(health.version.as_deref()).with_context(|| {
278            format!(
279                "Embedded OpenCode server did not match pinned stable v{} (binary={})",
280                version::PINNED_OPENCODE_VERSION,
281                launcher_config.binary
282            )
283        })?;
284
285        // Load model context limits (best-effort, don't fail if unavailable)
286        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
287            tracing::warn!("Failed to load model limits: {}", e);
288            HashMap::new()
289        });
290
291        tracing::info!("Loaded {} model context limits", model_context_limits.len());
292
293        Ok(Self {
294            _managed: Some(managed),
295            client,
296            model_context_limits,
297            base_url,
298            config,
299            spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
300        })
301    }
302
303    /// Get the HTTP client.
304    pub fn client(&self) -> &Client {
305        &self.client
306    }
307
308    /// Get the base URL of the managed server.
309    #[allow(clippy::allow_attributes, dead_code)]
310    pub fn base_url(&self) -> &str {
311        &self.base_url
312    }
313
314    /// Look up context limit for a specific model.
315    pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
316        self.model_context_limits
317            .get(&(provider_id.to_string(), model_id.to_string()))
318            .copied()
319    }
320
321    /// Get the session deadline duration.
322    pub fn session_deadline(&self) -> Duration {
323        Duration::from_secs(self.config.session_deadline_secs)
324    }
325
326    /// Get the inactivity timeout duration.
327    pub fn inactivity_timeout(&self) -> Duration {
328        Duration::from_secs(self.config.inactivity_timeout_secs)
329    }
330
331    /// Get the compaction threshold (0.0 - 1.0).
332    pub fn compaction_threshold(&self) -> f64 {
333        self.config.compaction_threshold
334    }
335
336    /// Returns session IDs created by this orchestrator instance.
337    pub fn spawned_sessions(&self) -> &Arc<RwLock<HashSet<String>>> {
338        &self.spawned_sessions
339    }
340
341    /// Load model context limits from GET /provider.
342    async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
343        let resp: ProviderListResponse = client.providers().list().await?;
344        let mut limits = HashMap::new();
345
346        for provider in resp.all {
347            for (model_id, model) in provider.models {
348                if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
349                    limits.insert((provider.id.clone(), model_id), limit);
350                }
351            }
352        }
353
354        Ok(limits)
355    }
356
357    /// Extract text content from the last assistant message.
358    pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
359        // Find the last assistant message
360        let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
361
362        // Join all text parts
363        let text: String = assistant_msg
364            .parts
365            .iter()
366            .filter_map(|p| {
367                if let Part::Text { text, .. } = p {
368                    Some(text.as_str())
369                } else {
370                    None
371                }
372            })
373            .collect::<Vec<_>>()
374            .join("");
375
376        if text.trim().is_empty() {
377            None
378        } else {
379            Some(text)
380        }
381    }
382}
383
384/// Test support utilities (requires `test-support` feature).
385///
386/// These functions may appear unused when compiling non-test targets because
387/// cargo's feature unification enables the feature for all targets when tests
388/// are compiled. The `dead_code` warning is expected and suppressed.
389#[cfg(feature = "test-support")]
390#[allow(dead_code, clippy::allow_attributes)]
391impl OrchestratorServer {
392    /// Build an `OrchestratorServer` wrapper around an existing client.
393    ///
394    /// Does NOT manage an opencode process (intended for wiremock tests).
395    /// Model context limits are not loaded and will return `None` for all lookups.
396    pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
397        Arc::new(Self::from_client_unshared(client, base_url))
398    }
399
400    /// Build an `OrchestratorServer` wrapper returning `Self` (not `Arc<Self>`).
401    ///
402    /// Useful for tests that need to populate an `OnceCell` directly.
403    pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
404        Self {
405            _managed: None,
406            client,
407            model_context_limits: HashMap::new(),
408            base_url: base_url.into().trim_end_matches('/').to_string(),
409            config: OrchestratorConfig::default(),
410            spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
411        }
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use serial_test::serial;
419    use std::sync::atomic::AtomicUsize;
420    use std::sync::atomic::Ordering;
421
422    #[tokio::test]
423    async fn init_with_retry_succeeds_on_first_attempt() {
424        let attempts = AtomicUsize::new(0);
425
426        let result: u32 = init_with_retry(|_| {
427            let n = attempts.fetch_add(1, Ordering::SeqCst);
428            async move {
429                // Always succeed
430                assert_eq!(n, 0, "should only be called once on success");
431                Ok(42)
432            }
433        })
434        .await
435        .unwrap();
436
437        assert_eq!(result, 42);
438        assert_eq!(attempts.load(Ordering::SeqCst), 1);
439    }
440
441    #[tokio::test]
442    async fn init_with_retry_retries_once_and_succeeds() {
443        let attempts = AtomicUsize::new(0);
444
445        let result: u32 = init_with_retry(|_| {
446            let n = attempts.fetch_add(1, Ordering::SeqCst);
447            async move {
448                if n == 0 {
449                    anyhow::bail!("fail first");
450                }
451                Ok(42)
452            }
453        })
454        .await
455        .unwrap();
456
457        assert_eq!(result, 42);
458        assert_eq!(attempts.load(Ordering::SeqCst), 2);
459    }
460
461    #[tokio::test]
462    async fn init_with_retry_fails_after_two_attempts() {
463        let attempts = AtomicUsize::new(0);
464
465        let err = init_with_retry::<(), _, _>(|_| {
466            attempts.fetch_add(1, Ordering::SeqCst);
467            async { anyhow::bail!("always fail") }
468        })
469        .await
470        .unwrap_err();
471
472        assert!(err.to_string().contains("always fail"));
473        assert_eq!(attempts.load(Ordering::SeqCst), 2);
474    }
475
476    #[test]
477    #[serial(env)]
478    fn managed_guard_disabled_when_env_not_set() {
479        // Ensure the env var is not set
480        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
481        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
482        assert!(!managed_guard_enabled());
483    }
484
485    #[test]
486    #[serial(env)]
487    fn managed_guard_enabled_when_env_is_1() {
488        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
489        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
490        assert!(managed_guard_enabled());
491        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
492        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
493    }
494
495    #[test]
496    #[serial(env)]
497    fn managed_guard_disabled_when_env_is_0() {
498        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
499        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
500        assert!(!managed_guard_enabled());
501        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
502        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
503    }
504
505    #[test]
506    #[serial(env)]
507    fn managed_guard_disabled_when_env_is_empty() {
508        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
509        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
510        assert!(!managed_guard_enabled());
511        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
512        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
513    }
514
515    #[test]
516    #[serial(env)]
517    fn managed_guard_disabled_when_env_is_whitespace() {
518        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
519        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "   ") };
520        assert!(!managed_guard_enabled());
521        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
522        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
523    }
524
525    #[test]
526    #[serial(env)]
527    fn managed_guard_enabled_when_env_is_truthy() {
528        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
529        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
530        assert!(managed_guard_enabled());
531        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
532        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
533    }
534}