Skip to main content

opencode_orchestrator_mcp/
server.rs

1//! Shared orchestrator server state.
2//!
3//! Wraps `ManagedServer` + `Client` + cached model context limits + config.
4
5use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17/// Environment variable name for the orchestrator-managed recursion guard.
18pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
19
20/// User-facing message returned when orchestrator tools are invoked in a nested context.
21pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
22     in a nested orchestration session. Consult a human for assistance or try to accomplish your \
23     task without the orchestration tools.";
24
25/// Check if the orchestrator-managed env var is set (guard enabled).
26pub fn managed_guard_enabled() -> bool {
27    match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
28        Ok(v) => v != "0" && !v.trim().is_empty(),
29        Err(_) => false,
30    }
31}
32
33/// Retry an async init operation once (2 total attempts) with tracing logs.
34pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
35where
36    F: FnMut(usize) -> Fut,
37    Fut: std::future::Future<Output = anyhow::Result<T>>,
38{
39    let mut last_err: Option<anyhow::Error> = None;
40
41    for attempt in 1..=2 {
42        tracing::info!(attempt, "orchestrator server lazy init attempt");
43        match f(attempt).await {
44            Ok(v) => {
45                if attempt > 1 {
46                    tracing::info!(
47                        attempt,
48                        "orchestrator server lazy init succeeded after retry"
49                    );
50                }
51                return Ok(v);
52            }
53            Err(e) => {
54                tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
55                last_err = Some(e);
56            }
57        }
58    }
59
60    tracing::error!("orchestrator server lazy init exhausted retries");
61    // Safety: The loop always runs at least once and sets last_err on failure
62    match last_err {
63        Some(e) => Err(e),
64        None => anyhow::bail!("init_with_retry: unexpected empty error state"),
65    }
66}
67
68/// Key for looking up model context limits: (`provider_id`, `model_id`)
69pub type ModelKey = (String, String);
70
71/// Shared state wrapping the managed `OpenCode` server and HTTP client.
72pub struct OrchestratorServer {
73    /// Keep alive for lifecycle; Drop kills the opencode serve process.
74    /// `None` when using an external client (e.g., wiremock tests).
75    _managed: Option<ManagedServer>,
76    /// HTTP client for `OpenCode` API
77    client: Client,
78    /// Cached model context limits from GET /provider
79    model_context_limits: HashMap<ModelKey, u64>,
80    /// Base URL of the managed server
81    base_url: String,
82    /// Orchestrator configuration (session timeouts, compaction threshold)
83    config: OrchestratorConfig,
84}
85
86impl OrchestratorServer {
87    /// Start a new managed `OpenCode` server and build the client.
88    ///
89    /// This is the eager initialization path that spawns the server immediately.
90    /// Prefer `start_lazy()` for deferred initialization.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the server fails to start or the client cannot be built.
95    #[allow(clippy::allow_attributes, dead_code)]
96    pub async fn start() -> anyhow::Result<Arc<Self>> {
97        Ok(Arc::new(Self::start_impl().await?))
98    }
99
100    /// Lazy initialization path for `OnceCell` usage.
101    ///
102    /// Checks the recursion guard env var first, then uses retry logic.
103    /// Returns `Self` (not `Arc<Self>`) for direct storage in `OnceCell`.
104    ///
105    /// # Errors
106    ///
107    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
108    /// Returns an error if the server fails to start after 2 attempts.
109    pub async fn start_lazy() -> anyhow::Result<Self> {
110        if managed_guard_enabled() {
111            anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
112        }
113
114        init_with_retry(|_attempt| async { Self::start_impl().await }).await
115    }
116
117    /// Internal implementation that actually spawns the server.
118    async fn start_impl() -> anyhow::Result<Self> {
119        // Load configuration (best-effort, use defaults if unavailable)
120        let cwd = std::env::current_dir().unwrap_or_default();
121        let config = match agentic_config::loader::load_merged(&cwd) {
122            Ok(loaded) => {
123                for w in &loaded.warnings {
124                    tracing::warn!("{w}");
125                }
126                loaded.config.orchestrator
127            }
128            Err(e) => {
129                tracing::warn!("Failed to load config, using defaults: {e}");
130                OrchestratorConfig::default()
131            }
132        };
133
134        let managed = ManagedServer::start(ServerOptions::default())
135            .await
136            .context("Failed to start embedded `opencode serve`")?;
137
138        // Avoid trailing slash to prevent `//event` formatting
139        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
140
141        let directory = std::env::current_dir()
142            .ok()
143            .map(|p| p.to_string_lossy().to_string());
144
145        let mut builder = Client::builder().base_url(&base_url);
146        if let Some(dir) = &directory {
147            builder = builder.directory(dir.clone());
148        }
149
150        let client = builder
151            .build()
152            .context("Failed to build opencode-rs HTTP client")?;
153
154        // Load model context limits (best-effort, don't fail if unavailable)
155        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
156            tracing::warn!("Failed to load model limits: {}", e);
157            HashMap::new()
158        });
159
160        tracing::info!("Loaded {} model context limits", model_context_limits.len());
161
162        Ok(Self {
163            _managed: Some(managed),
164            client,
165            model_context_limits,
166            base_url,
167            config,
168        })
169    }
170
171    /// Get the HTTP client.
172    pub fn client(&self) -> &Client {
173        &self.client
174    }
175
176    /// Get the base URL of the managed server.
177    #[allow(clippy::allow_attributes, dead_code)]
178    pub fn base_url(&self) -> &str {
179        &self.base_url
180    }
181
182    /// Look up context limit for a specific model.
183    pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
184        self.model_context_limits
185            .get(&(provider_id.to_string(), model_id.to_string()))
186            .copied()
187    }
188
189    /// Get the session deadline duration.
190    pub fn session_deadline(&self) -> Duration {
191        Duration::from_secs(self.config.session_deadline_secs)
192    }
193
194    /// Get the inactivity timeout duration.
195    pub fn inactivity_timeout(&self) -> Duration {
196        Duration::from_secs(self.config.inactivity_timeout_secs)
197    }
198
199    /// Get the compaction threshold (0.0 - 1.0).
200    pub fn compaction_threshold(&self) -> f64 {
201        self.config.compaction_threshold
202    }
203
204    /// Load model context limits from GET /provider.
205    async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
206        let resp: ProviderListResponse = client.providers().list().await?;
207        let mut limits = HashMap::new();
208
209        for provider in resp.all {
210            for (model_id, model) in provider.models {
211                if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
212                    limits.insert((provider.id.clone(), model_id), limit);
213                }
214            }
215        }
216
217        Ok(limits)
218    }
219
220    /// Extract text content from the last assistant message.
221    pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
222        // Find the last assistant message
223        let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
224
225        // Join all text parts
226        let text: String = assistant_msg
227            .parts
228            .iter()
229            .filter_map(|p| {
230                if let Part::Text { text, .. } = p {
231                    Some(text.as_str())
232                } else {
233                    None
234                }
235            })
236            .collect::<Vec<_>>()
237            .join("");
238
239        if text.trim().is_empty() {
240            None
241        } else {
242            Some(text)
243        }
244    }
245}
246
247/// Test support utilities (requires `test-support` feature).
248///
249/// These functions may appear unused when compiling non-test targets because
250/// cargo's feature unification enables the feature for all targets when tests
251/// are compiled. The `dead_code` warning is expected and suppressed.
252#[cfg(feature = "test-support")]
253#[allow(dead_code, clippy::allow_attributes)]
254impl OrchestratorServer {
255    /// Build an `OrchestratorServer` wrapper around an existing client.
256    ///
257    /// Does NOT manage an opencode process (intended for wiremock tests).
258    /// Model context limits are not loaded and will return `None` for all lookups.
259    pub fn from_client(client: Client, base_url: impl Into<String>) -> Arc<Self> {
260        Arc::new(Self::from_client_unshared(client, base_url))
261    }
262
263    /// Build an `OrchestratorServer` wrapper returning `Self` (not `Arc<Self>`).
264    ///
265    /// Useful for tests that need to populate an `OnceCell` directly.
266    pub fn from_client_unshared(client: Client, base_url: impl Into<String>) -> Self {
267        Self {
268            _managed: None,
269            client,
270            model_context_limits: HashMap::new(),
271            base_url: base_url.into().trim_end_matches('/').to_string(),
272            config: OrchestratorConfig::default(),
273        }
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use std::sync::Mutex;
281    use std::sync::OnceLock;
282    use std::sync::atomic::AtomicUsize;
283    use std::sync::atomic::Ordering;
284
285    /// Mutex to serialize env var tests (env vars are process-global).
286    static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
287
288    #[tokio::test]
289    async fn init_with_retry_succeeds_on_first_attempt() {
290        let attempts = AtomicUsize::new(0);
291
292        let result: u32 = init_with_retry(|_| {
293            let n = attempts.fetch_add(1, Ordering::SeqCst);
294            async move {
295                // Always succeed
296                assert_eq!(n, 0, "should only be called once on success");
297                Ok(42)
298            }
299        })
300        .await
301        .unwrap();
302
303        assert_eq!(result, 42);
304        assert_eq!(attempts.load(Ordering::SeqCst), 1);
305    }
306
307    #[tokio::test]
308    async fn init_with_retry_retries_once_and_succeeds() {
309        let attempts = AtomicUsize::new(0);
310
311        let result: u32 = init_with_retry(|_| {
312            let n = attempts.fetch_add(1, Ordering::SeqCst);
313            async move {
314                if n == 0 {
315                    anyhow::bail!("fail first");
316                }
317                Ok(42)
318            }
319        })
320        .await
321        .unwrap();
322
323        assert_eq!(result, 42);
324        assert_eq!(attempts.load(Ordering::SeqCst), 2);
325    }
326
327    #[tokio::test]
328    async fn init_with_retry_fails_after_two_attempts() {
329        let attempts = AtomicUsize::new(0);
330
331        let err = init_with_retry::<(), _, _>(|_| {
332            attempts.fetch_add(1, Ordering::SeqCst);
333            async { anyhow::bail!("always fail") }
334        })
335        .await
336        .unwrap_err();
337
338        assert!(err.to_string().contains("always fail"));
339        assert_eq!(attempts.load(Ordering::SeqCst), 2);
340    }
341
342    #[test]
343    fn managed_guard_disabled_when_env_not_set() {
344        let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
345
346        // Ensure the env var is not set
347        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
348        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
349        assert!(!managed_guard_enabled());
350    }
351
352    #[test]
353    fn managed_guard_enabled_when_env_is_1() {
354        let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
355
356        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
357        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
358        assert!(managed_guard_enabled());
359        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
360        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
361    }
362
363    #[test]
364    fn managed_guard_disabled_when_env_is_0() {
365        let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
366
367        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
368        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
369        assert!(!managed_guard_enabled());
370        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
371        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
372    }
373
374    #[test]
375    fn managed_guard_disabled_when_env_is_empty() {
376        let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
377
378        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
379        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
380        assert!(!managed_guard_enabled());
381        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
382        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
383    }
384
385    #[test]
386    fn managed_guard_disabled_when_env_is_whitespace() {
387        let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
388
389        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
390        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "   ") };
391        assert!(!managed_guard_enabled());
392        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
393        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
394    }
395
396    #[test]
397    fn managed_guard_enabled_when_env_is_truthy() {
398        let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
399
400        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
401        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
402        assert!(managed_guard_enabled());
403        // SAFETY: Test runs with ENV_LOCK held, ensuring no concurrent env modification
404        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
405    }
406}