Skip to main content

opencode_orchestrator_mcp/
server.rs

1//! Shared orchestrator server state.
2//!
3//! Wraps `ManagedServer` + `Client` + cached model context limits + config.
4
5use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::collections::HashSet;
15use std::sync::Arc;
16use std::sync::Mutex as StdMutex;
17use std::time::Duration;
18use tokio::sync::Mutex as AsyncMutex;
19use tokio::sync::RwLock;
20
21use crate::error::OrchestratorError;
22use crate::version;
23
24/// Environment variable name for the orchestrator-managed recursion guard.
25pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
26
27/// User-facing message returned when orchestrator tools are invoked in a nested context.
28pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
29     in a nested orchestration session. Consult a human for assistance or try to accomplish your \
30     task without the orchestration tools.";
31
32/// Check if the orchestrator-managed env var is set (guard enabled).
33pub fn managed_guard_enabled() -> bool {
34    match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
35        Ok(v) => v != "0" && !v.trim().is_empty(),
36        Err(_) => false,
37    }
38}
39
40/// Retry an async init operation once (2 total attempts) with tracing logs.
41pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
42where
43    F: FnMut(usize) -> Fut,
44    Fut: std::future::Future<Output = anyhow::Result<T>>,
45{
46    let mut last_err: Option<anyhow::Error> = None;
47
48    for attempt in 1..=2 {
49        tracing::info!(attempt, "orchestrator server lazy init attempt");
50        match f(attempt).await {
51            Ok(v) => {
52                if attempt > 1 {
53                    tracing::info!(
54                        attempt,
55                        "orchestrator server lazy init succeeded after retry"
56                    );
57                }
58                return Ok(v);
59            }
60            Err(e) => {
61                tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
62                last_err = Some(e);
63            }
64        }
65    }
66
67    tracing::error!("orchestrator server lazy init exhausted retries");
68    // Safety: The loop always runs at least once and sets last_err on failure
69    match last_err {
70        Some(e) => Err(e),
71        None => anyhow::bail!("init_with_retry: unexpected empty error state"),
72    }
73}
74
75/// Key for looking up model context limits: (`provider_id`, `model_id`)
76pub type ModelKey = (String, String);
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79enum ServerEntryState {
80    Healthy,
81    NeedsRecovery { reason: String },
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum RecoveryMode {
86    Managed,
87    External,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum CommandPolicyDecision {
92    Allowed,
93    DeniedByAllowlist,
94    DeniedByDenylist,
95}
96
97impl CommandPolicyDecision {
98    #[must_use]
99    pub fn is_allowed(self) -> bool {
100        matches!(self, Self::Allowed)
101    }
102}
103
104impl RecoveryMode {
105    fn as_str(self) -> &'static str {
106        match self {
107            Self::Managed => "managed",
108            Self::External => "external",
109        }
110    }
111}
112
113enum HandleState {
114    Empty,
115    Ready {
116        snapshot: Arc<OrchestratorServer>,
117        mode: RecoveryMode,
118    },
119    Stale {
120        snapshot: Arc<OrchestratorServer>,
121        mode: RecoveryMode,
122        reason: String,
123    },
124    Failed {
125        mode: RecoveryMode,
126        base_url: Option<String>,
127        error: String,
128    },
129}
130
131const TOOL_ENTRY_HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(5);
132
133/// Shared recoverable handle for the process-global orchestrator server snapshot.
134pub struct OrchestratorServerHandle {
135    state: AsyncMutex<HandleState>,
136}
137
138impl Default for OrchestratorServerHandle {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl OrchestratorServerHandle {
145    #[must_use]
146    pub fn new() -> Self {
147        Self {
148            state: AsyncMutex::new(HandleState::Empty),
149        }
150    }
151
152    /// Acquire a live orchestrator server snapshot for a tool entry.
153    ///
154    /// Existing callers keep their previously acquired `Arc<OrchestratorServer>`
155    /// even if this handle later replaces the cached snapshot during recovery.
156    pub async fn acquire(&self) -> anyhow::Result<Arc<OrchestratorServer>> {
157        self.get_or_recover_with(OrchestratorServer::start_lazy)
158            .await
159    }
160
161    async fn get_or_recover_with<F, Fut>(
162        &self,
163        mut start: F,
164    ) -> anyhow::Result<Arc<OrchestratorServer>>
165    where
166        F: FnMut() -> Fut,
167        Fut: std::future::Future<Output = anyhow::Result<OrchestratorServer>>,
168    {
169        loop {
170            let ready_snapshot = {
171                let mut state = self.state.lock().await;
172
173                match &mut *state {
174                    HandleState::Empty => {
175                        tracing::info!(
176                            "orchestrator server missing cached snapshot; starting embedded server"
177                        );
178
179                        match start().await {
180                            Ok(server) => {
181                                let rebuilt_mode = if server.is_managed() {
182                                    RecoveryMode::Managed
183                                } else {
184                                    RecoveryMode::External
185                                };
186                                let rebuilt = Arc::new(server);
187                                trace_state_transition(
188                                    "Empty",
189                                    "Ready",
190                                    "initialization",
191                                    rebuilt_mode,
192                                    Some(rebuilt.base_url()),
193                                );
194                                *state = HandleState::Ready {
195                                    snapshot: Arc::clone(&rebuilt),
196                                    mode: rebuilt_mode,
197                                };
198                                return Ok(rebuilt);
199                            }
200                            Err(error) => {
201                                let reason = error.to_string();
202                                trace_state_transition(
203                                    "Empty",
204                                    "Failed",
205                                    &reason,
206                                    RecoveryMode::Managed,
207                                    None,
208                                );
209                                *state = HandleState::Failed {
210                                    mode: RecoveryMode::Managed,
211                                    base_url: None,
212                                    error: reason,
213                                };
214                                return Err(error);
215                            }
216                        }
217                    }
218                    HandleState::Ready { snapshot, mode } => Some((Arc::clone(snapshot), *mode)),
219                    HandleState::Stale {
220                        snapshot,
221                        mode,
222                        reason,
223                    } => match mode {
224                        RecoveryMode::Managed => {
225                            let stale_reason = reason.clone();
226                            match start().await {
227                                Ok(server) => {
228                                    let rebuilt_mode = if server.is_managed() {
229                                        RecoveryMode::Managed
230                                    } else {
231                                        RecoveryMode::External
232                                    };
233                                    let rebuilt = Arc::new(server);
234                                    trace_state_transition(
235                                        "Stale",
236                                        "Ready",
237                                        &stale_reason,
238                                        rebuilt_mode,
239                                        Some(rebuilt.base_url()),
240                                    );
241                                    *state = HandleState::Ready {
242                                        snapshot: Arc::clone(&rebuilt),
243                                        mode: rebuilt_mode,
244                                    };
245                                    return Ok(rebuilt);
246                                }
247                                Err(error) => {
248                                    let failure = error.to_string();
249                                    trace_state_transition(
250                                        "Stale",
251                                        "Failed",
252                                        &failure,
253                                        *mode,
254                                        Some(snapshot.base_url()),
255                                    );
256                                    *state = HandleState::Failed {
257                                        mode: *mode,
258                                        base_url: Some(snapshot.base_url().to_string()),
259                                        error: failure,
260                                    };
261                                    return Err(error);
262                                }
263                            }
264                        }
265                        RecoveryMode::External => {
266                            let base_url = snapshot.base_url().to_string();
267                            let stale_reason = reason.clone();
268                            trace_state_transition(
269                                "Stale",
270                                "Failed",
271                                &stale_reason,
272                                *mode,
273                                Some(&base_url),
274                            );
275                            *state = HandleState::Failed {
276                                mode: *mode,
277                                base_url: Some(base_url.clone()),
278                                error: stale_reason.clone(),
279                            };
280                            return Err(external_unavailable(Some(base_url), stale_reason));
281                        }
282                    },
283                    HandleState::Failed {
284                        mode,
285                        base_url,
286                        error,
287                    } => match mode {
288                        RecoveryMode::Managed => match start().await {
289                            Ok(server) => {
290                                let rebuilt_mode = if server.is_managed() {
291                                    RecoveryMode::Managed
292                                } else {
293                                    RecoveryMode::External
294                                };
295                                let rebuilt = Arc::new(server);
296                                trace_state_transition(
297                                    "Failed",
298                                    "Ready",
299                                    error,
300                                    rebuilt_mode,
301                                    Some(rebuilt.base_url()),
302                                );
303                                *state = HandleState::Ready {
304                                    snapshot: Arc::clone(&rebuilt),
305                                    mode: rebuilt_mode,
306                                };
307                                return Ok(rebuilt);
308                            }
309                            Err(start_error) => {
310                                let failure = start_error.to_string();
311                                error.clone_from(&failure);
312                                return Err(start_error);
313                            }
314                        },
315                        RecoveryMode::External => {
316                            return Err(external_unavailable(base_url.clone(), error.clone()));
317                        }
318                    },
319                }
320            };
321
322            let Some((snapshot, mode)) = ready_snapshot else {
323                continue;
324            };
325
326            let validation = snapshot.validate_for_tool_entry().await?;
327
328            let mut state = self.state.lock().await;
329            let HandleState::Ready {
330                snapshot: current,
331                mode: current_mode,
332            } = &*state
333            else {
334                continue;
335            };
336
337            if !Arc::ptr_eq(current, &snapshot) || *current_mode != mode {
338                continue;
339            }
340
341            match validation {
342                ServerEntryState::Healthy => return Ok(snapshot),
343                ServerEntryState::NeedsRecovery { reason } => {
344                    trace_cache_invalidated(&reason, mode, Some(snapshot.base_url()));
345
346                    match mode {
347                        RecoveryMode::Managed => {
348                            tracing::warn!(reason = %reason, "cached orchestrator server failed liveness check; rebuilding");
349                            trace_state_transition(
350                                "Ready",
351                                "Stale",
352                                &reason,
353                                mode,
354                                Some(snapshot.base_url()),
355                            );
356                            *state = HandleState::Stale {
357                                snapshot: Arc::clone(&snapshot),
358                                mode,
359                                reason: reason.clone(),
360                            };
361
362                            match start().await {
363                                Ok(server) => {
364                                    let rebuilt_mode = if server.is_managed() {
365                                        RecoveryMode::Managed
366                                    } else {
367                                        RecoveryMode::External
368                                    };
369                                    let rebuilt = Arc::new(server);
370                                    trace_state_transition(
371                                        "Stale",
372                                        "Ready",
373                                        &reason,
374                                        rebuilt_mode,
375                                        Some(rebuilt.base_url()),
376                                    );
377                                    *state = HandleState::Ready {
378                                        snapshot: Arc::clone(&rebuilt),
379                                        mode: rebuilt_mode,
380                                    };
381                                    return Ok(rebuilt);
382                                }
383                                Err(error) => {
384                                    let failure = error.to_string();
385                                    trace_state_transition(
386                                        "Stale",
387                                        "Failed",
388                                        &failure,
389                                        mode,
390                                        Some(snapshot.base_url()),
391                                    );
392                                    *state = HandleState::Failed {
393                                        mode,
394                                        base_url: Some(snapshot.base_url().to_string()),
395                                        error: failure,
396                                    };
397                                    return Err(error);
398                                }
399                            }
400                        }
401                        RecoveryMode::External => {
402                            let base_url = snapshot.base_url().to_string();
403                            trace_state_transition(
404                                "Ready",
405                                "Failed",
406                                &reason,
407                                mode,
408                                Some(&base_url),
409                            );
410                            *state = HandleState::Failed {
411                                mode,
412                                base_url: Some(base_url.clone()),
413                                error: reason.clone(),
414                            };
415                            return Err(external_unavailable(Some(base_url), reason));
416                        }
417                    }
418                }
419            }
420        }
421    }
422
423    #[cfg(any(test, feature = "test-support"))]
424    #[must_use]
425    pub fn from_server_unshared(server: OrchestratorServer) -> Self {
426        let mode = if server.is_managed() {
427            RecoveryMode::Managed
428        } else {
429            RecoveryMode::External
430        };
431
432        Self {
433            state: AsyncMutex::new(HandleState::Ready {
434                snapshot: Arc::new(server),
435                mode,
436            }),
437        }
438    }
439
440    #[cfg(any(test, feature = "test-support"))]
441    pub async fn acquire_or_recover_with<F, Fut>(
442        &self,
443        start: F,
444    ) -> anyhow::Result<Arc<OrchestratorServer>>
445    where
446        F: FnMut() -> Fut,
447        Fut: std::future::Future<Output = anyhow::Result<OrchestratorServer>>,
448    {
449        self.get_or_recover_with(start).await
450    }
451}
452
453fn trace_cache_invalidated(reason: &str, mode: RecoveryMode, base_url: Option<&str>) {
454    if let Some(base_url) = base_url {
455        tracing::info!(
456            event = "cache_invalidated",
457            reason = %reason,
458            mode = mode.as_str(),
459            base_url = %base_url,
460        );
461    } else {
462        tracing::info!(
463            event = "cache_invalidated",
464            reason = %reason,
465            mode = mode.as_str(),
466        );
467    }
468}
469
470fn trace_state_transition(
471    from: &'static str,
472    to: &'static str,
473    reason: &str,
474    mode: RecoveryMode,
475    base_url: Option<&str>,
476) {
477    if let Some(base_url) = base_url {
478        tracing::info!(
479            event = "state_transition",
480            from,
481            to,
482            reason = %reason,
483            mode = mode.as_str(),
484            base_url = %base_url,
485        );
486    } else {
487        tracing::info!(
488            event = "state_transition",
489            from,
490            to,
491            reason = %reason,
492            mode = mode.as_str(),
493        );
494    }
495}
496
497fn external_unavailable(base_url: Option<String>, reason: String) -> anyhow::Error {
498    OrchestratorError::ExternalServerUnavailable {
499        base_url: base_url.unwrap_or_else(|| "<unknown>".to_string()),
500        reason,
501    }
502    .into()
503}
504
505/// Shared state wrapping the managed `OpenCode` server and HTTP client.
506pub struct OrchestratorServer {
507    /// Keep alive for lifecycle; Drop kills the opencode serve process.
508    /// `None` when using an external client (e.g., wiremock tests).
509    managed_server: StdMutex<Option<ManagedServer>>,
510    /// HTTP client for `OpenCode` API
511    client: Client,
512    /// Cached model context limits from GET /provider
513    model_context_limits: HashMap<ModelKey, u64>,
514    /// Base URL of the managed server
515    base_url: String,
516    /// Orchestrator configuration (session timeouts, compaction threshold)
517    config: OrchestratorConfig,
518    /// Session IDs created by this orchestrator instance.
519    spawned_sessions: Arc<RwLock<HashSet<String>>>,
520}
521
522impl OrchestratorServer {
523    pub fn command_policy_decision(&self, command: &str) -> CommandPolicyDecision {
524        let deny_matches = self
525            .config
526            .commands
527            .deny
528            .iter()
529            .map(String::as_str)
530            .map(str::trim)
531            .filter(|entry| !entry.is_empty())
532            .any(|entry| entry == command);
533        if deny_matches {
534            return CommandPolicyDecision::DeniedByDenylist;
535        }
536
537        let mut allow_entries = self
538            .config
539            .commands
540            .allow
541            .iter()
542            .map(String::as_str)
543            .map(str::trim)
544            .filter(|entry| !entry.is_empty())
545            .peekable();
546
547        if allow_entries.peek().is_some() && !allow_entries.any(|entry| entry == command) {
548            return CommandPolicyDecision::DeniedByAllowlist;
549        }
550
551        CommandPolicyDecision::Allowed
552    }
553
554    pub fn is_command_allowed(&self, command: &str) -> bool {
555        self.command_policy_decision(command).is_allowed()
556    }
557
558    /// Start a new managed `OpenCode` server and build the client.
559    ///
560    /// This is the eager initialization path that spawns the server immediately.
561    /// Prefer `start_lazy()` for deferred initialization.
562    ///
563    /// # Errors
564    ///
565    /// Returns an error if the server fails to start or the client cannot be built.
566    #[allow(clippy::allow_attributes, dead_code)]
567    pub async fn start() -> anyhow::Result<Arc<Self>> {
568        Ok(Arc::new(Self::start_impl().await?))
569    }
570
571    /// Lazy initialization path for `OnceCell` usage.
572    ///
573    /// Checks the recursion guard env var first, then uses retry logic.
574    /// Returns `Self` (not `Arc<Self>`) for direct storage in `OnceCell`.
575    ///
576    /// # Errors
577    ///
578    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
579    /// Returns an error if the server fails to start after 2 attempts.
580    pub async fn start_lazy() -> anyhow::Result<Self> {
581        Self::start_lazy_with_config(None).await
582    }
583
584    /// Start the orchestrator server lazily with optional config injection.
585    ///
586    /// # Arguments
587    ///
588    /// * `config_json` - Optional JSON config to inject via `OPENCODE_CONFIG_CONTENT`
589    ///
590    /// # Errors
591    ///
592    /// Returns the guard message if `OPENCODE_ORCHESTRATOR_MANAGED` is set.
593    /// Returns an error if the server fails to start after 2 attempts.
594    pub async fn start_lazy_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
595        if managed_guard_enabled() {
596            anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
597        }
598
599        init_with_retry(|_attempt| {
600            let cfg = config_json.clone();
601            async move { Self::start_impl_with_config(cfg).await }
602        })
603        .await
604    }
605
606    /// Internal implementation that actually spawns the server.
607    async fn start_impl() -> anyhow::Result<Self> {
608        let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
609
610        // Load configuration (best-effort, use defaults if unavailable)
611        let config = match agentic_config::loader::load_merged(&cwd) {
612            Ok(loaded) => {
613                for w in &loaded.warnings {
614                    tracing::warn!("{w}");
615                }
616                loaded.config.orchestrator
617            }
618            Err(e) => {
619                tracing::warn!("Failed to load config, using defaults: {e}");
620                OrchestratorConfig::default()
621            }
622        };
623
624        let launcher_config = version::resolve_launcher_config(&cwd)
625            .context("Failed to resolve OpenCode launcher configuration")?;
626
627        tracing::info!(
628            binary = %launcher_config.binary,
629            launcher_args = ?launcher_config.launcher_args,
630            expected_version = %version::PINNED_OPENCODE_VERSION,
631            "starting embedded opencode serve (pinned stable)"
632        );
633
634        let opts = ServerOptions::default()
635            .binary(&launcher_config.binary)
636            .launcher_args(launcher_config.launcher_args)
637            .directory(cwd.clone());
638
639        let managed = ManagedServer::start(opts)
640            .await
641            .context("Failed to start embedded `opencode serve`")?;
642
643        // Avoid trailing slash to prevent `//event` formatting
644        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
645
646        let client = Client::builder()
647            .base_url(&base_url)
648            .directory(cwd.to_string_lossy().to_string())
649            .build()
650            .context("Failed to build opencode-rs HTTP client")?;
651
652        let health = client
653            .misc()
654            .health()
655            .await
656            .context("Failed to fetch /global/health for version validation")?;
657
658        version::validate_exact_version(health.version.as_deref()).with_context(|| {
659            format!(
660                "Embedded OpenCode server did not match pinned stable v{} (binary={})",
661                version::PINNED_OPENCODE_VERSION,
662                launcher_config.binary
663            )
664        })?;
665
666        // Load model context limits (best-effort, don't fail if unavailable)
667        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
668            tracing::warn!("Failed to load model limits: {}", e);
669            HashMap::new()
670        });
671
672        tracing::info!("Loaded {} model context limits", model_context_limits.len());
673
674        Ok(Self {
675            managed_server: StdMutex::new(Some(managed)),
676            client,
677            model_context_limits,
678            base_url,
679            config,
680            spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
681        })
682    }
683
684    /// Internal implementation with optional config injection.
685    async fn start_impl_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
686        let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
687
688        // Load configuration (best-effort, use defaults if unavailable)
689        let config = match agentic_config::loader::load_merged(&cwd) {
690            Ok(loaded) => {
691                for w in &loaded.warnings {
692                    tracing::warn!("{w}");
693                }
694                loaded.config.orchestrator
695            }
696            Err(e) => {
697                tracing::warn!("Failed to load config, using defaults: {e}");
698                OrchestratorConfig::default()
699            }
700        };
701
702        let launcher_config = version::resolve_launcher_config(&cwd)
703            .context("Failed to resolve OpenCode launcher configuration")?;
704
705        tracing::info!(
706            binary = %launcher_config.binary,
707            launcher_args = ?launcher_config.launcher_args,
708            expected_version = %version::PINNED_OPENCODE_VERSION,
709            config_injected = config_json.is_some(),
710            "starting embedded opencode serve (pinned stable)"
711        );
712
713        let mut opts = ServerOptions::default()
714            .binary(&launcher_config.binary)
715            .launcher_args(launcher_config.launcher_args)
716            .directory(cwd.clone());
717
718        // Inject config if provided
719        if let Some(cfg) = config_json {
720            opts = opts.config_json(cfg);
721        }
722
723        let managed = ManagedServer::start(opts)
724            .await
725            .context("Failed to start embedded `opencode serve`")?;
726
727        // Avoid trailing slash to prevent `//event` formatting
728        let base_url = managed.url().to_string().trim_end_matches('/').to_string();
729
730        let client = Client::builder()
731            .base_url(&base_url)
732            .directory(cwd.to_string_lossy().to_string())
733            .build()
734            .context("Failed to build opencode-rs HTTP client")?;
735
736        let health = client
737            .misc()
738            .health()
739            .await
740            .context("Failed to fetch /global/health for version validation")?;
741
742        version::validate_exact_version(health.version.as_deref()).with_context(|| {
743            format!(
744                "Embedded OpenCode server did not match pinned stable v{} (binary={})",
745                version::PINNED_OPENCODE_VERSION,
746                launcher_config.binary
747            )
748        })?;
749
750        // Load model context limits (best-effort, don't fail if unavailable)
751        let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
752            tracing::warn!("Failed to load model limits: {}", e);
753            HashMap::new()
754        });
755
756        tracing::info!("Loaded {} model context limits", model_context_limits.len());
757
758        Ok(Self {
759            managed_server: StdMutex::new(Some(managed)),
760            client,
761            model_context_limits,
762            base_url,
763            config,
764            spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
765        })
766    }
767
768    /// Get the HTTP client.
769    pub fn client(&self) -> &Client {
770        &self.client
771    }
772
773    /// Get the base URL of the managed server.
774    #[allow(clippy::allow_attributes, dead_code)]
775    pub fn base_url(&self) -> &str {
776        &self.base_url
777    }
778
779    /// Look up context limit for a specific model.
780    pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
781        self.model_context_limits
782            .get(&(provider_id.to_string(), model_id.to_string()))
783            .copied()
784    }
785
786    /// Get the session deadline duration.
787    pub fn session_deadline(&self) -> Duration {
788        Duration::from_secs(self.config.session_deadline_secs)
789    }
790
791    /// Get the inactivity timeout duration.
792    pub fn inactivity_timeout(&self) -> Duration {
793        Duration::from_secs(self.config.inactivity_timeout_secs)
794    }
795
796    /// Get the compaction threshold (0.0 - 1.0).
797    pub fn compaction_threshold(&self) -> f64 {
798        self.config.compaction_threshold
799    }
800
801    /// Returns session IDs created by this orchestrator instance.
802    pub fn spawned_sessions(&self) -> &Arc<RwLock<HashSet<String>>> {
803        &self.spawned_sessions
804    }
805
806    fn managed_server_lock(&self) -> std::sync::MutexGuard<'_, Option<ManagedServer>> {
807        self.managed_server
808            .lock()
809            .unwrap_or_else(std::sync::PoisonError::into_inner)
810    }
811
812    fn is_managed(&self) -> bool {
813        self.managed_server_lock().is_some()
814    }
815
816    async fn validate_for_tool_entry(&self) -> anyhow::Result<ServerEntryState> {
817        self.validate_for_tool_entry_with_timeout(TOOL_ENTRY_HEALTH_PROBE_TIMEOUT)
818            .await
819    }
820
821    async fn validate_for_tool_entry_with_timeout(
822        &self,
823        health_probe_timeout: Duration,
824    ) -> anyhow::Result<ServerEntryState> {
825        if self.is_managed() {
826            let is_running = {
827                let mut managed = self.managed_server_lock();
828                managed
829                    .as_mut()
830                    .is_some_and(opencode_rs::server::ManagedServer::is_running)
831            };
832
833            if !is_running {
834                return Ok(ServerEntryState::NeedsRecovery {
835                    reason: "managed child is no longer running".to_string(),
836                });
837            }
838        }
839
840        match tokio::time::timeout(health_probe_timeout, self.client.misc().health()).await {
841            Ok(Ok(health)) if health.healthy => Ok(ServerEntryState::Healthy),
842            Ok(Ok(_health)) => Ok(ServerEntryState::NeedsRecovery {
843                reason: "/global/health reported unhealthy".to_string(),
844            }),
845            Ok(Err(error)) => Ok(ServerEntryState::NeedsRecovery {
846                reason: format!("/global/health probe failed: {error}"),
847            }),
848            Err(_elapsed) => Ok(ServerEntryState::NeedsRecovery {
849                reason: format!("/global/health probe timed out after {health_probe_timeout:?}"),
850            }),
851        }
852    }
853
854    /// Load model context limits from GET /provider.
855    async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
856        let resp: ProviderListResponse = client.providers().list().await?;
857        let mut limits = HashMap::new();
858
859        for provider in resp.all {
860            for (model_id, model) in provider.models {
861                if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
862                    limits.insert((provider.id.clone(), model_id), limit);
863                }
864            }
865        }
866
867        Ok(limits)
868    }
869
870    /// Extract text content from the last assistant message.
871    pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
872        // Find the last assistant message
873        let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
874
875        // Join all text parts
876        let text: String = assistant_msg
877            .parts
878            .iter()
879            .filter_map(|p| {
880                if let Part::Text { text, .. } = p {
881                    Some(text.as_str())
882                } else {
883                    None
884                }
885            })
886            .collect::<Vec<_>>()
887            .join("");
888
889        if text.trim().is_empty() {
890            None
891        } else {
892            Some(text)
893        }
894    }
895}
896
897/// Test support utilities (requires `test-support` feature).
898///
899/// These functions may appear unused when compiling non-test targets because
900/// cargo's feature unification enables the feature for all targets when tests
901/// are compiled. The `dead_code` warning is expected and suppressed.
902#[cfg(any(test, feature = "test-support"))]
903#[allow(dead_code, clippy::allow_attributes)]
904impl OrchestratorServer {
905    /// Build an `OrchestratorServer` wrapper around an existing client.
906    ///
907    /// Does NOT manage an opencode process (intended for wiremock tests).
908    /// Model context limits are not loaded and will return `None` for all lookups.
909    pub fn from_client(
910        client: Client,
911        base_url: impl Into<String>,
912        mode: RecoveryMode,
913    ) -> Arc<Self> {
914        Arc::new(Self::from_client_unshared(client, base_url, mode))
915    }
916
917    pub fn from_client_with_config(
918        client: Client,
919        base_url: impl Into<String>,
920        mode: RecoveryMode,
921        config: OrchestratorConfig,
922    ) -> Arc<Self> {
923        Arc::new(Self::from_client_unshared_with_config(
924            client, base_url, mode, config,
925        ))
926    }
927
928    /// Build an `OrchestratorServer` wrapper returning `Self` (not `Arc<Self>`).
929    ///
930    /// Useful for tests that need to preseed an `OrchestratorServerHandle` directly.
931    pub fn from_client_unshared(
932        client: Client,
933        base_url: impl Into<String>,
934        mode: RecoveryMode,
935    ) -> Self {
936        Self::from_client_unshared_with_config(
937            client,
938            base_url,
939            mode,
940            OrchestratorConfig::default(),
941        )
942    }
943
944    pub fn from_client_unshared_with_config(
945        client: Client,
946        base_url: impl Into<String>,
947        _mode: RecoveryMode,
948        config: OrchestratorConfig,
949    ) -> Self {
950        Self {
951            managed_server: StdMutex::new(None),
952            client,
953            model_context_limits: HashMap::new(),
954            base_url: base_url.into().trim_end_matches('/').to_string(),
955            config,
956            spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
957        }
958    }
959
960    pub fn from_managed_for_testing(
961        managed: ManagedServer,
962        client: Client,
963        base_url: impl Into<String>,
964    ) -> Self {
965        Self::from_managed_for_testing_with_config(
966            managed,
967            client,
968            base_url,
969            OrchestratorConfig::default(),
970        )
971    }
972
973    pub fn from_managed_for_testing_with_config(
974        managed: ManagedServer,
975        client: Client,
976        base_url: impl Into<String>,
977        config: OrchestratorConfig,
978    ) -> Self {
979        Self {
980            managed_server: StdMutex::new(Some(managed)),
981            client,
982            model_context_limits: HashMap::new(),
983            base_url: base_url.into().trim_end_matches('/').to_string(),
984            config,
985            spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
986        }
987    }
988
989    pub async fn stop_managed_for_testing(&self) -> anyhow::Result<()> {
990        let managed = {
991            let mut guard = self.managed_server_lock();
992            guard.take()
993        };
994
995        match managed {
996            Some(managed) => managed.stop().await.map_err(Into::into),
997            None => anyhow::bail!("no managed server is attached to this snapshot"),
998        }
999    }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::*;
1005    use agentic_config::types::OrchestratorCommandsConfig;
1006    use serial_test::serial;
1007    use std::sync::Arc;
1008    use std::sync::atomic::AtomicBool;
1009    use std::sync::atomic::AtomicUsize;
1010    use std::sync::atomic::Ordering;
1011    use std::time::Duration;
1012    use std::time::Instant;
1013    use tokio::io::AsyncReadExt;
1014    use tokio::io::AsyncWriteExt;
1015    use tokio::net::TcpListener;
1016    use tokio::process::Command;
1017    use tokio::sync::Notify;
1018    use wiremock::Mock;
1019    use wiremock::MockServer;
1020    use wiremock::ResponseTemplate;
1021    use wiremock::matchers::method;
1022    use wiremock::matchers::path;
1023
1024    struct ManagedEnvGuard {
1025        previous: Option<std::ffi::OsString>,
1026    }
1027
1028    impl ManagedEnvGuard {
1029        fn new() -> Self {
1030            Self {
1031                previous: std::env::var_os(OPENCODE_ORCHESTRATOR_MANAGED_ENV),
1032            }
1033        }
1034    }
1035
1036    impl Drop for ManagedEnvGuard {
1037        fn drop(&mut self) {
1038            match &self.previous {
1039                // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1040                Some(value) => unsafe {
1041                    std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, value);
1042                },
1043                // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1044                None => unsafe {
1045                    std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV);
1046                },
1047            }
1048        }
1049    }
1050
1051    async fn health_mock_server() -> MockServer {
1052        let mock = MockServer::start().await;
1053        Mock::given(method("GET"))
1054            .and(path("/global/health"))
1055            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1056                "healthy": true,
1057                "version": version::PINNED_OPENCODE_VERSION,
1058            })))
1059            .mount(&mock)
1060            .await;
1061        mock
1062    }
1063
1064    fn test_client(base_url: &str) -> Client {
1065        opencode_rs::ClientBuilder::new()
1066            .base_url(base_url)
1067            .timeout_secs(5)
1068            .build()
1069            .unwrap()
1070    }
1071
1072    fn external_server(base_url: &str) -> OrchestratorServer {
1073        OrchestratorServer::from_client_unshared(
1074            test_client(base_url),
1075            base_url,
1076            RecoveryMode::External,
1077        )
1078    }
1079
1080    fn external_server_with_config(
1081        base_url: &str,
1082        config: OrchestratorConfig,
1083    ) -> OrchestratorServer {
1084        OrchestratorServer::from_client_unshared_with_config(
1085            test_client(base_url),
1086            base_url,
1087            RecoveryMode::External,
1088            config,
1089        )
1090    }
1091
1092    async fn exited_child() -> tokio::process::Child {
1093        let mut child = Command::new("sh").arg("-c").arg("exit 0").spawn().unwrap();
1094        let _status = child.wait().await.unwrap();
1095        child
1096    }
1097
1098    async fn managed_server_with_exited_child(base_url: &str) -> OrchestratorServer {
1099        let managed = ManagedServer::from_child_for_testing(exited_child().await, base_url, 9);
1100        OrchestratorServer::from_managed_for_testing(managed, test_client(base_url), base_url)
1101    }
1102
1103    #[test]
1104    fn command_policy_allows_all_when_allowlist_is_empty() {
1105        let server = external_server_with_config(
1106            "http://127.0.0.1:9",
1107            OrchestratorConfig {
1108                commands: OrchestratorCommandsConfig {
1109                    allow: vec![],
1110                    deny: vec!["blocked".into()],
1111                },
1112                ..OrchestratorConfig::default()
1113            },
1114        );
1115
1116        assert_eq!(
1117            server.command_policy_decision("plan"),
1118            CommandPolicyDecision::Allowed
1119        );
1120        assert_eq!(
1121            server.command_policy_decision("blocked"),
1122            CommandPolicyDecision::DeniedByDenylist
1123        );
1124    }
1125
1126    #[test]
1127    fn command_policy_trims_entries_and_deny_wins() {
1128        let server = external_server_with_config(
1129            "http://127.0.0.1:9",
1130            OrchestratorConfig {
1131                commands: OrchestratorCommandsConfig {
1132                    allow: vec!["  plan  ".into()],
1133                    deny: vec!["plan".into()],
1134                },
1135                ..OrchestratorConfig::default()
1136            },
1137        );
1138
1139        assert_eq!(
1140            server.command_policy_decision("plan"),
1141            CommandPolicyDecision::DeniedByDenylist
1142        );
1143    }
1144
1145    #[test]
1146    fn command_policy_matching_is_case_sensitive() {
1147        let server = external_server_with_config(
1148            "http://127.0.0.1:9",
1149            OrchestratorConfig {
1150                commands: OrchestratorCommandsConfig {
1151                    allow: vec!["Plan".into()],
1152                    deny: vec!["blocked".into()],
1153                },
1154                ..OrchestratorConfig::default()
1155            },
1156        );
1157
1158        assert_eq!(
1159            server.command_policy_decision("Plan"),
1160            CommandPolicyDecision::Allowed
1161        );
1162        assert_eq!(
1163            server.command_policy_decision("plan"),
1164            CommandPolicyDecision::DeniedByAllowlist
1165        );
1166        assert!(server.is_command_allowed("Plan"));
1167        assert!(!server.is_command_allowed("plan"));
1168    }
1169
1170    struct BlockingHealthServer {
1171        base_url: String,
1172        started_requests: Arc<AtomicUsize>,
1173        started_notify: Arc<Notify>,
1174        released: Arc<AtomicBool>,
1175        release_notify: Arc<Notify>,
1176        task: tokio::task::JoinHandle<()>,
1177    }
1178
1179    impl BlockingHealthServer {
1180        async fn start(expected_requests: usize) -> Self {
1181            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1182            let addr = listener.local_addr().unwrap();
1183            let started_requests = Arc::new(AtomicUsize::new(0));
1184            let started_notify = Arc::new(Notify::new());
1185            let released = Arc::new(AtomicBool::new(false));
1186            let release_notify = Arc::new(Notify::new());
1187            let body = format!(
1188                r#"{{"healthy":true,"version":"{}"}}"#,
1189                version::PINNED_OPENCODE_VERSION
1190            );
1191            let response = Arc::new(format!(
1192                "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
1193                body.len(),
1194                body
1195            ));
1196
1197            let task = tokio::spawn({
1198                let started_requests = Arc::clone(&started_requests);
1199                let started_notify = Arc::clone(&started_notify);
1200                let released = Arc::clone(&released);
1201                let release_notify = Arc::clone(&release_notify);
1202                let response = Arc::clone(&response);
1203
1204                async move {
1205                    let mut connections = Vec::with_capacity(expected_requests);
1206
1207                    for _ in 0..expected_requests {
1208                        let (mut stream, _addr) = listener.accept().await.unwrap();
1209                        let started_requests = Arc::clone(&started_requests);
1210                        let started_notify = Arc::clone(&started_notify);
1211                        let released = Arc::clone(&released);
1212                        let release_notify = Arc::clone(&release_notify);
1213                        let response = Arc::clone(&response);
1214
1215                        connections.push(tokio::spawn(async move {
1216                            let mut request = [0_u8; 1024];
1217                            let _read = stream.read(&mut request).await.unwrap();
1218                            started_requests.fetch_add(1, Ordering::SeqCst);
1219                            started_notify.notify_waiters();
1220
1221                            loop {
1222                                let notified = release_notify.notified();
1223                                if released.load(Ordering::SeqCst) {
1224                                    break;
1225                                }
1226                                notified.await;
1227                            }
1228
1229                            stream.write_all(response.as_bytes()).await.unwrap();
1230                            stream.shutdown().await.unwrap();
1231                        }));
1232                    }
1233
1234                    for connection in connections {
1235                        connection.await.unwrap();
1236                    }
1237                }
1238            });
1239
1240            Self {
1241                base_url: format!("http://{addr}"),
1242                started_requests,
1243                started_notify,
1244                released,
1245                release_notify,
1246                task,
1247            }
1248        }
1249
1250        async fn wait_for_requests(&self, expected_requests: usize) {
1251            tokio::time::timeout(Duration::from_secs(1), async {
1252                while self.started_requests.load(Ordering::SeqCst) < expected_requests {
1253                    self.started_notify.notified().await;
1254                }
1255            })
1256            .await
1257            .unwrap();
1258        }
1259
1260        fn release(&self) {
1261            self.released.store(true, Ordering::SeqCst);
1262            self.release_notify.notify_waiters();
1263        }
1264    }
1265
1266    impl Drop for BlockingHealthServer {
1267        fn drop(&mut self) {
1268            self.release();
1269            self.task.abort();
1270        }
1271    }
1272
1273    #[tokio::test]
1274    async fn init_with_retry_succeeds_on_first_attempt() {
1275        let attempts = AtomicUsize::new(0);
1276
1277        let result: u32 = init_with_retry(|_| {
1278            let n = attempts.fetch_add(1, Ordering::SeqCst);
1279            async move {
1280                // Always succeed
1281                assert_eq!(n, 0, "should only be called once on success");
1282                Ok(42)
1283            }
1284        })
1285        .await
1286        .unwrap();
1287
1288        assert_eq!(result, 42);
1289        assert_eq!(attempts.load(Ordering::SeqCst), 1);
1290    }
1291
1292    #[tokio::test]
1293    async fn init_with_retry_retries_once_and_succeeds() {
1294        let attempts = AtomicUsize::new(0);
1295
1296        let result: u32 = init_with_retry(|_| {
1297            let n = attempts.fetch_add(1, Ordering::SeqCst);
1298            async move {
1299                if n == 0 {
1300                    anyhow::bail!("fail first");
1301                }
1302                Ok(42)
1303            }
1304        })
1305        .await
1306        .unwrap();
1307
1308        assert_eq!(result, 42);
1309        assert_eq!(attempts.load(Ordering::SeqCst), 2);
1310    }
1311
1312    #[tokio::test]
1313    async fn init_with_retry_fails_after_two_attempts() {
1314        let attempts = AtomicUsize::new(0);
1315
1316        let err = init_with_retry::<(), _, _>(|_| {
1317            attempts.fetch_add(1, Ordering::SeqCst);
1318            async { anyhow::bail!("always fail") }
1319        })
1320        .await
1321        .unwrap_err();
1322
1323        assert!(err.to_string().contains("always fail"));
1324        assert_eq!(attempts.load(Ordering::SeqCst), 2);
1325    }
1326
1327    #[tokio::test]
1328    async fn handle_serializes_initialization_and_reuses_snapshot() {
1329        let mock = health_mock_server().await;
1330        let base_url = mock.uri();
1331        let handle = Arc::new(OrchestratorServerHandle::new());
1332        let starts = Arc::new(AtomicUsize::new(0));
1333
1334        let first = {
1335            let handle = Arc::clone(&handle);
1336            let starts = Arc::clone(&starts);
1337            let base_url = base_url.clone();
1338            tokio::spawn(async move {
1339                handle
1340                    .get_or_recover_with(|| {
1341                        let starts = Arc::clone(&starts);
1342                        let base_url = base_url.clone();
1343                        async move {
1344                            starts.fetch_add(1, Ordering::SeqCst);
1345                            tokio::time::sleep(Duration::from_millis(50)).await;
1346                            Ok(external_server(&base_url))
1347                        }
1348                    })
1349                    .await
1350            })
1351        };
1352
1353        let second = {
1354            let handle = Arc::clone(&handle);
1355            let starts = Arc::clone(&starts);
1356            let base_url = base_url.clone();
1357            tokio::spawn(async move {
1358                handle
1359                    .get_or_recover_with(|| {
1360                        let starts = Arc::clone(&starts);
1361                        let base_url = base_url.clone();
1362                        async move {
1363                            starts.fetch_add(1, Ordering::SeqCst);
1364                            Ok(external_server(&base_url))
1365                        }
1366                    })
1367                    .await
1368            })
1369        };
1370
1371        let first = first.await.unwrap().unwrap();
1372        let second = second.await.unwrap().unwrap();
1373
1374        assert_eq!(starts.load(Ordering::SeqCst), 1);
1375        assert!(Arc::ptr_eq(&first, &second));
1376    }
1377
1378    #[tokio::test]
1379    async fn validate_for_tool_entry_uses_health_for_external_server() {
1380        let mock = health_mock_server().await;
1381        let server = external_server(&mock.uri());
1382
1383        let state = server.validate_for_tool_entry().await.unwrap();
1384
1385        assert_eq!(state, ServerEntryState::Healthy);
1386        let requests = mock.received_requests().await.unwrap();
1387        assert!(
1388            requests
1389                .iter()
1390                .any(|request| request.url.path() == "/global/health"),
1391            "expected /global/health request"
1392        );
1393    }
1394
1395    #[tokio::test]
1396    async fn validate_for_tool_entry_times_out_health_probe() {
1397        let mock = MockServer::start().await;
1398        Mock::given(method("GET"))
1399            .and(path("/global/health"))
1400            .respond_with(
1401                ResponseTemplate::new(200)
1402                    .set_delay(Duration::from_secs(30))
1403                    .set_body_json(serde_json::json!({
1404                        "healthy": true,
1405                        "version": version::PINNED_OPENCODE_VERSION,
1406                    })),
1407            )
1408            .mount(&mock)
1409            .await;
1410        let server = external_server(&mock.uri());
1411
1412        let state = server
1413            .validate_for_tool_entry_with_timeout(Duration::from_millis(25))
1414            .await
1415            .unwrap();
1416
1417        assert_eq!(
1418            state,
1419            ServerEntryState::NeedsRecovery {
1420                reason: "/global/health probe timed out after 25ms".to_string(),
1421            }
1422        );
1423    }
1424
1425    #[tokio::test]
1426    async fn validate_for_tool_entry_short_circuits_dead_managed_server() {
1427        let server = managed_server_with_exited_child("http://127.0.0.1:9").await;
1428
1429        let state = server.validate_for_tool_entry().await.unwrap();
1430
1431        assert_eq!(
1432            state,
1433            ServerEntryState::NeedsRecovery {
1434                reason: "managed child is no longer running".to_string(),
1435            }
1436        );
1437    }
1438
1439    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1440    async fn handle_allows_concurrent_healthy_acquires_without_serializing_validation() {
1441        let health = BlockingHealthServer::start(3).await;
1442        let handle = Arc::new(OrchestratorServerHandle::from_server_unshared(
1443            external_server(&health.base_url),
1444        ));
1445
1446        let started_at = Instant::now();
1447        let tasks = (0..3)
1448            .map(|_| {
1449                let handle = Arc::clone(&handle);
1450                tokio::spawn(async move { handle.acquire().await })
1451            })
1452            .collect::<Vec<_>>();
1453
1454        health.wait_for_requests(3).await;
1455        tokio::time::sleep(Duration::from_millis(75)).await;
1456        health.release();
1457
1458        let mut snapshots = Vec::with_capacity(tasks.len());
1459        for task in tasks {
1460            snapshots.push(task.await.unwrap().unwrap());
1461        }
1462
1463        assert!(
1464            started_at.elapsed() < Duration::from_millis(250),
1465            "healthy acquires should overlap rather than serialize"
1466        );
1467        assert!(Arc::ptr_eq(&snapshots[0], &snapshots[1]));
1468        assert!(Arc::ptr_eq(&snapshots[1], &snapshots[2]));
1469    }
1470
1471    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1472    async fn handle_single_flights_concurrent_stale_acquires() {
1473        let stale = Arc::new(managed_server_with_exited_child("http://127.0.0.1:9").await);
1474        let handle = Arc::new(OrchestratorServerHandle {
1475            state: AsyncMutex::new(HandleState::Ready {
1476                snapshot: Arc::clone(&stale),
1477                mode: RecoveryMode::Managed,
1478            }),
1479        });
1480        let mock = health_mock_server().await;
1481        let base_url = mock.uri();
1482        let starts = Arc::new(AtomicUsize::new(0));
1483
1484        let tasks = (0..3)
1485            .map(|_| {
1486                let handle = Arc::clone(&handle);
1487                let starts = Arc::clone(&starts);
1488                let base_url = base_url.clone();
1489                tokio::spawn(async move {
1490                    handle
1491                        .get_or_recover_with(|| {
1492                            let starts = Arc::clone(&starts);
1493                            let base_url = base_url.clone();
1494                            async move {
1495                                starts.fetch_add(1, Ordering::SeqCst);
1496                                tokio::time::sleep(Duration::from_millis(50)).await;
1497                                Ok(external_server(&base_url))
1498                            }
1499                        })
1500                        .await
1501                })
1502            })
1503            .collect::<Vec<_>>();
1504
1505        let mut snapshots = Vec::with_capacity(tasks.len());
1506        for task in tasks {
1507            snapshots.push(task.await.unwrap().unwrap());
1508        }
1509
1510        assert_eq!(starts.load(Ordering::SeqCst), 1);
1511        assert!(!Arc::ptr_eq(&stale, &snapshots[0]));
1512        assert!(Arc::ptr_eq(&snapshots[0], &snapshots[1]));
1513        assert!(Arc::ptr_eq(&snapshots[1], &snapshots[2]));
1514    }
1515
1516    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1517    async fn handle_retries_if_cache_changes_while_validating() {
1518        let old_health = BlockingHealthServer::start(1).await;
1519        let original = Arc::new(external_server(&old_health.base_url));
1520        let handle = Arc::new(OrchestratorServerHandle {
1521            state: AsyncMutex::new(HandleState::Ready {
1522                snapshot: Arc::clone(&original),
1523                mode: RecoveryMode::External,
1524            }),
1525        });
1526        let replacement_mock = health_mock_server().await;
1527        let replacement = Arc::new(external_server(&replacement_mock.uri()));
1528
1529        let acquire = {
1530            let handle = Arc::clone(&handle);
1531            tokio::spawn(async move {
1532                handle
1533                    .acquire_or_recover_with(|| async { anyhow::bail!("should not rebuild") })
1534                    .await
1535            })
1536        };
1537
1538        old_health.wait_for_requests(1).await;
1539
1540        {
1541            let mut state = tokio::time::timeout(Duration::from_millis(100), handle.state.lock())
1542                .await
1543                .expect("validation should not hold the handle mutex");
1544            *state = HandleState::Ready {
1545                snapshot: Arc::clone(&replacement),
1546                mode: RecoveryMode::External,
1547            };
1548        }
1549
1550        old_health.release();
1551
1552        let snapshot = acquire.await.unwrap().unwrap();
1553
1554        assert!(!Arc::ptr_eq(&snapshot, &original));
1555        assert!(Arc::ptr_eq(&snapshot, &replacement));
1556    }
1557
1558    #[tokio::test]
1559    async fn handle_rebuilds_without_invalidating_held_snapshot() {
1560        let stale = Arc::new(managed_server_with_exited_child("http://127.0.0.1:9").await);
1561        let handle = OrchestratorServerHandle {
1562            state: AsyncMutex::new(HandleState::Ready {
1563                snapshot: Arc::clone(&stale),
1564                mode: RecoveryMode::Managed,
1565            }),
1566        };
1567        let mock = health_mock_server().await;
1568        let base_url = mock.uri();
1569        let starts = Arc::new(AtomicUsize::new(0));
1570
1571        let rebuilt = handle
1572            .get_or_recover_with(|| {
1573                let starts = Arc::clone(&starts);
1574                let base_url = base_url.clone();
1575                async move {
1576                    starts.fetch_add(1, Ordering::SeqCst);
1577                    Ok(external_server(&base_url))
1578                }
1579            })
1580            .await
1581            .unwrap();
1582
1583        assert_eq!(starts.load(Ordering::SeqCst), 1);
1584        assert!(!Arc::ptr_eq(&stale, &rebuilt));
1585        assert_eq!(stale.base_url(), "http://127.0.0.1:9");
1586        assert_eq!(rebuilt.base_url(), base_url.trim_end_matches('/'));
1587    }
1588
1589    #[test]
1590    #[serial(env)]
1591    fn managed_guard_disabled_when_env_not_set() {
1592        let _env = ManagedEnvGuard::new();
1593        // Ensure the env var is not set
1594        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1595        unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
1596        assert!(!managed_guard_enabled());
1597    }
1598
1599    #[test]
1600    #[serial(env)]
1601    fn managed_guard_enabled_when_env_is_1() {
1602        let _env = ManagedEnvGuard::new();
1603        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1604        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
1605        assert!(managed_guard_enabled());
1606    }
1607
1608    #[test]
1609    #[serial(env)]
1610    fn managed_guard_disabled_when_env_is_0() {
1611        let _env = ManagedEnvGuard::new();
1612        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1613        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
1614        assert!(!managed_guard_enabled());
1615    }
1616
1617    #[test]
1618    #[serial(env)]
1619    fn managed_guard_disabled_when_env_is_empty() {
1620        let _env = ManagedEnvGuard::new();
1621        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1622        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
1623        assert!(!managed_guard_enabled());
1624    }
1625
1626    #[test]
1627    #[serial(env)]
1628    fn managed_guard_disabled_when_env_is_whitespace() {
1629        let _env = ManagedEnvGuard::new();
1630        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1631        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "   ") };
1632        assert!(!managed_guard_enabled());
1633    }
1634
1635    #[test]
1636    #[serial(env)]
1637    fn managed_guard_enabled_when_env_is_truthy() {
1638        let _env = ManagedEnvGuard::new();
1639        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1640        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
1641        assert!(managed_guard_enabled());
1642    }
1643
1644    #[tokio::test]
1645    #[serial(env)]
1646    async fn recursion_guard_only_blocks_real_startup_paths() {
1647        let _env = ManagedEnvGuard::new();
1648        // SAFETY: Test serialized by #[serial(env)], preventing concurrent env access.
1649        unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
1650
1651        let mock = health_mock_server().await;
1652        let handle = OrchestratorServerHandle::from_server_unshared(external_server(&mock.uri()));
1653        let reused = handle
1654            .get_or_recover_with(|| async { anyhow::bail!("should not start") })
1655            .await
1656            .unwrap();
1657        assert_eq!(reused.base_url(), mock.uri().trim_end_matches('/'));
1658
1659        let fresh_handle = OrchestratorServerHandle::new();
1660        let err = match fresh_handle.acquire().await {
1661            Ok(_server) => panic!("expected recursion guard to block fresh startup"),
1662            Err(error) => error,
1663        };
1664        assert!(err.to_string().contains(ORCHESTRATOR_MANAGED_GUARD_MESSAGE));
1665    }
1666
1667    #[tokio::test]
1668    async fn external_failure_becomes_sticky_and_typed() {
1669        let handle = OrchestratorServerHandle::from_server_unshared(
1670            OrchestratorServer::from_client_unshared(
1671                test_client("http://127.0.0.1:9"),
1672                "http://127.0.0.1:9",
1673                RecoveryMode::External,
1674            ),
1675        );
1676        let starts = AtomicUsize::new(0);
1677
1678        let first = handle
1679            .acquire_or_recover_with(|| {
1680                starts.fetch_add(1, Ordering::SeqCst);
1681                async { anyhow::bail!("should not rebuild external servers") }
1682            })
1683            .await;
1684        let second = handle
1685            .acquire_or_recover_with(|| {
1686                starts.fetch_add(1, Ordering::SeqCst);
1687                async { anyhow::bail!("should not rebuild external servers") }
1688            })
1689            .await;
1690
1691        let first = match first {
1692            Ok(_snapshot) => panic!("expected typed external failure on first acquire"),
1693            Err(error) => error,
1694        };
1695        let second = match second {
1696            Ok(_snapshot) => panic!("expected sticky external failure on second acquire"),
1697            Err(error) => error,
1698        };
1699
1700        assert_eq!(starts.load(Ordering::SeqCst), 0);
1701        assert!(
1702            first
1703                .to_string()
1704                .contains("External OpenCode server unavailable"),
1705            "expected typed external failure, got: {first}"
1706        );
1707        assert_eq!(first.to_string(), second.to_string());
1708    }
1709}