1use agentic_config::types::OrchestratorConfig;
6use anyhow::Context;
7use opencode_rs::Client;
8use opencode_rs::server::ManagedServer;
9use opencode_rs::server::ServerOptions;
10use opencode_rs::types::message::Message;
11use opencode_rs::types::message::Part;
12use opencode_rs::types::provider::ProviderListResponse;
13use std::collections::HashMap;
14use std::collections::HashSet;
15use std::sync::Arc;
16use std::sync::Mutex as StdMutex;
17use std::time::Duration;
18use tokio::sync::Mutex as AsyncMutex;
19use tokio::sync::RwLock;
20
21use crate::error::OrchestratorError;
22use crate::version;
23
24pub const OPENCODE_ORCHESTRATOR_MANAGED_ENV: &str = "OPENCODE_ORCHESTRATOR_MANAGED";
26
27pub const ORCHESTRATOR_MANAGED_GUARD_MESSAGE: &str = "ENV VAR OPENCODE_ORCHESTRATOR_MANAGED is set to 1. This most commonly happens when you're \
29 in a nested orchestration session. Consult a human for assistance or try to accomplish your \
30 task without the orchestration tools.";
31
32pub fn managed_guard_enabled() -> bool {
34 match std::env::var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) {
35 Ok(v) => v != "0" && !v.trim().is_empty(),
36 Err(_) => false,
37 }
38}
39
40pub async fn init_with_retry<T, F, Fut>(mut f: F) -> anyhow::Result<T>
42where
43 F: FnMut(usize) -> Fut,
44 Fut: std::future::Future<Output = anyhow::Result<T>>,
45{
46 let mut last_err: Option<anyhow::Error> = None;
47
48 for attempt in 1..=2 {
49 tracing::info!(attempt, "orchestrator server lazy init attempt");
50 match f(attempt).await {
51 Ok(v) => {
52 if attempt > 1 {
53 tracing::info!(
54 attempt,
55 "orchestrator server lazy init succeeded after retry"
56 );
57 }
58 return Ok(v);
59 }
60 Err(e) => {
61 tracing::warn!(attempt, error = %e, "orchestrator server lazy init failed");
62 last_err = Some(e);
63 }
64 }
65 }
66
67 tracing::error!("orchestrator server lazy init exhausted retries");
68 match last_err {
70 Some(e) => Err(e),
71 None => anyhow::bail!("init_with_retry: unexpected empty error state"),
72 }
73}
74
75pub type ModelKey = (String, String);
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79enum ServerEntryState {
80 Healthy,
81 NeedsRecovery { reason: String },
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum RecoveryMode {
86 Managed,
87 External,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum CommandPolicyDecision {
92 Allowed,
93 DeniedByAllowlist,
94 DeniedByDenylist,
95}
96
97impl CommandPolicyDecision {
98 #[must_use]
99 pub fn is_allowed(self) -> bool {
100 matches!(self, Self::Allowed)
101 }
102}
103
104impl RecoveryMode {
105 fn as_str(self) -> &'static str {
106 match self {
107 Self::Managed => "managed",
108 Self::External => "external",
109 }
110 }
111}
112
113enum HandleState {
114 Empty,
115 Ready {
116 snapshot: Arc<OrchestratorServer>,
117 mode: RecoveryMode,
118 },
119 Stale {
120 snapshot: Arc<OrchestratorServer>,
121 mode: RecoveryMode,
122 reason: String,
123 },
124 Failed {
125 mode: RecoveryMode,
126 base_url: Option<String>,
127 error: String,
128 },
129}
130
131const TOOL_ENTRY_HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(5);
132
133pub struct OrchestratorServerHandle {
135 state: AsyncMutex<HandleState>,
136}
137
138impl Default for OrchestratorServerHandle {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl OrchestratorServerHandle {
145 #[must_use]
146 pub fn new() -> Self {
147 Self {
148 state: AsyncMutex::new(HandleState::Empty),
149 }
150 }
151
152 pub async fn acquire(&self) -> anyhow::Result<Arc<OrchestratorServer>> {
157 self.get_or_recover_with(OrchestratorServer::start_lazy)
158 .await
159 }
160
161 async fn get_or_recover_with<F, Fut>(
162 &self,
163 mut start: F,
164 ) -> anyhow::Result<Arc<OrchestratorServer>>
165 where
166 F: FnMut() -> Fut,
167 Fut: std::future::Future<Output = anyhow::Result<OrchestratorServer>>,
168 {
169 loop {
170 let ready_snapshot = {
171 let mut state = self.state.lock().await;
172
173 match &mut *state {
174 HandleState::Empty => {
175 tracing::info!(
176 "orchestrator server missing cached snapshot; starting embedded server"
177 );
178
179 match start().await {
180 Ok(server) => {
181 let rebuilt_mode = if server.is_managed() {
182 RecoveryMode::Managed
183 } else {
184 RecoveryMode::External
185 };
186 let rebuilt = Arc::new(server);
187 trace_state_transition(
188 "Empty",
189 "Ready",
190 "initialization",
191 rebuilt_mode,
192 Some(rebuilt.base_url()),
193 );
194 *state = HandleState::Ready {
195 snapshot: Arc::clone(&rebuilt),
196 mode: rebuilt_mode,
197 };
198 return Ok(rebuilt);
199 }
200 Err(error) => {
201 let reason = error.to_string();
202 trace_state_transition(
203 "Empty",
204 "Failed",
205 &reason,
206 RecoveryMode::Managed,
207 None,
208 );
209 *state = HandleState::Failed {
210 mode: RecoveryMode::Managed,
211 base_url: None,
212 error: reason,
213 };
214 return Err(error);
215 }
216 }
217 }
218 HandleState::Ready { snapshot, mode } => Some((Arc::clone(snapshot), *mode)),
219 HandleState::Stale {
220 snapshot,
221 mode,
222 reason,
223 } => match mode {
224 RecoveryMode::Managed => {
225 let stale_reason = reason.clone();
226 match start().await {
227 Ok(server) => {
228 let rebuilt_mode = if server.is_managed() {
229 RecoveryMode::Managed
230 } else {
231 RecoveryMode::External
232 };
233 let rebuilt = Arc::new(server);
234 trace_state_transition(
235 "Stale",
236 "Ready",
237 &stale_reason,
238 rebuilt_mode,
239 Some(rebuilt.base_url()),
240 );
241 *state = HandleState::Ready {
242 snapshot: Arc::clone(&rebuilt),
243 mode: rebuilt_mode,
244 };
245 return Ok(rebuilt);
246 }
247 Err(error) => {
248 let failure = error.to_string();
249 trace_state_transition(
250 "Stale",
251 "Failed",
252 &failure,
253 *mode,
254 Some(snapshot.base_url()),
255 );
256 *state = HandleState::Failed {
257 mode: *mode,
258 base_url: Some(snapshot.base_url().to_string()),
259 error: failure,
260 };
261 return Err(error);
262 }
263 }
264 }
265 RecoveryMode::External => {
266 let base_url = snapshot.base_url().to_string();
267 let stale_reason = reason.clone();
268 trace_state_transition(
269 "Stale",
270 "Failed",
271 &stale_reason,
272 *mode,
273 Some(&base_url),
274 );
275 *state = HandleState::Failed {
276 mode: *mode,
277 base_url: Some(base_url.clone()),
278 error: stale_reason.clone(),
279 };
280 return Err(external_unavailable(Some(base_url), stale_reason));
281 }
282 },
283 HandleState::Failed {
284 mode,
285 base_url,
286 error,
287 } => match mode {
288 RecoveryMode::Managed => match start().await {
289 Ok(server) => {
290 let rebuilt_mode = if server.is_managed() {
291 RecoveryMode::Managed
292 } else {
293 RecoveryMode::External
294 };
295 let rebuilt = Arc::new(server);
296 trace_state_transition(
297 "Failed",
298 "Ready",
299 error,
300 rebuilt_mode,
301 Some(rebuilt.base_url()),
302 );
303 *state = HandleState::Ready {
304 snapshot: Arc::clone(&rebuilt),
305 mode: rebuilt_mode,
306 };
307 return Ok(rebuilt);
308 }
309 Err(start_error) => {
310 let failure = start_error.to_string();
311 error.clone_from(&failure);
312 return Err(start_error);
313 }
314 },
315 RecoveryMode::External => {
316 return Err(external_unavailable(base_url.clone(), error.clone()));
317 }
318 },
319 }
320 };
321
322 let Some((snapshot, mode)) = ready_snapshot else {
323 continue;
324 };
325
326 let validation = snapshot.validate_for_tool_entry().await?;
327
328 let mut state = self.state.lock().await;
329 let HandleState::Ready {
330 snapshot: current,
331 mode: current_mode,
332 } = &*state
333 else {
334 continue;
335 };
336
337 if !Arc::ptr_eq(current, &snapshot) || *current_mode != mode {
338 continue;
339 }
340
341 match validation {
342 ServerEntryState::Healthy => return Ok(snapshot),
343 ServerEntryState::NeedsRecovery { reason } => {
344 trace_cache_invalidated(&reason, mode, Some(snapshot.base_url()));
345
346 match mode {
347 RecoveryMode::Managed => {
348 tracing::warn!(reason = %reason, "cached orchestrator server failed liveness check; rebuilding");
349 trace_state_transition(
350 "Ready",
351 "Stale",
352 &reason,
353 mode,
354 Some(snapshot.base_url()),
355 );
356 *state = HandleState::Stale {
357 snapshot: Arc::clone(&snapshot),
358 mode,
359 reason: reason.clone(),
360 };
361
362 match start().await {
363 Ok(server) => {
364 let rebuilt_mode = if server.is_managed() {
365 RecoveryMode::Managed
366 } else {
367 RecoveryMode::External
368 };
369 let rebuilt = Arc::new(server);
370 trace_state_transition(
371 "Stale",
372 "Ready",
373 &reason,
374 rebuilt_mode,
375 Some(rebuilt.base_url()),
376 );
377 *state = HandleState::Ready {
378 snapshot: Arc::clone(&rebuilt),
379 mode: rebuilt_mode,
380 };
381 return Ok(rebuilt);
382 }
383 Err(error) => {
384 let failure = error.to_string();
385 trace_state_transition(
386 "Stale",
387 "Failed",
388 &failure,
389 mode,
390 Some(snapshot.base_url()),
391 );
392 *state = HandleState::Failed {
393 mode,
394 base_url: Some(snapshot.base_url().to_string()),
395 error: failure,
396 };
397 return Err(error);
398 }
399 }
400 }
401 RecoveryMode::External => {
402 let base_url = snapshot.base_url().to_string();
403 trace_state_transition(
404 "Ready",
405 "Failed",
406 &reason,
407 mode,
408 Some(&base_url),
409 );
410 *state = HandleState::Failed {
411 mode,
412 base_url: Some(base_url.clone()),
413 error: reason.clone(),
414 };
415 return Err(external_unavailable(Some(base_url), reason));
416 }
417 }
418 }
419 }
420 }
421 }
422
423 #[cfg(any(test, feature = "test-support"))]
424 #[must_use]
425 pub fn from_server_unshared(server: OrchestratorServer) -> Self {
426 let mode = if server.is_managed() {
427 RecoveryMode::Managed
428 } else {
429 RecoveryMode::External
430 };
431
432 Self {
433 state: AsyncMutex::new(HandleState::Ready {
434 snapshot: Arc::new(server),
435 mode,
436 }),
437 }
438 }
439
440 #[cfg(any(test, feature = "test-support"))]
441 pub async fn acquire_or_recover_with<F, Fut>(
442 &self,
443 start: F,
444 ) -> anyhow::Result<Arc<OrchestratorServer>>
445 where
446 F: FnMut() -> Fut,
447 Fut: std::future::Future<Output = anyhow::Result<OrchestratorServer>>,
448 {
449 self.get_or_recover_with(start).await
450 }
451}
452
453fn trace_cache_invalidated(reason: &str, mode: RecoveryMode, base_url: Option<&str>) {
454 if let Some(base_url) = base_url {
455 tracing::info!(
456 event = "cache_invalidated",
457 reason = %reason,
458 mode = mode.as_str(),
459 base_url = %base_url,
460 );
461 } else {
462 tracing::info!(
463 event = "cache_invalidated",
464 reason = %reason,
465 mode = mode.as_str(),
466 );
467 }
468}
469
470fn trace_state_transition(
471 from: &'static str,
472 to: &'static str,
473 reason: &str,
474 mode: RecoveryMode,
475 base_url: Option<&str>,
476) {
477 if let Some(base_url) = base_url {
478 tracing::info!(
479 event = "state_transition",
480 from,
481 to,
482 reason = %reason,
483 mode = mode.as_str(),
484 base_url = %base_url,
485 );
486 } else {
487 tracing::info!(
488 event = "state_transition",
489 from,
490 to,
491 reason = %reason,
492 mode = mode.as_str(),
493 );
494 }
495}
496
497fn external_unavailable(base_url: Option<String>, reason: String) -> anyhow::Error {
498 OrchestratorError::ExternalServerUnavailable {
499 base_url: base_url.unwrap_or_else(|| "<unknown>".to_string()),
500 reason,
501 }
502 .into()
503}
504
505pub struct OrchestratorServer {
507 managed_server: StdMutex<Option<ManagedServer>>,
510 client: Client,
512 model_context_limits: HashMap<ModelKey, u64>,
514 base_url: String,
516 config: OrchestratorConfig,
518 spawned_sessions: Arc<RwLock<HashSet<String>>>,
520}
521
522impl OrchestratorServer {
523 pub fn command_policy_decision(&self, command: &str) -> CommandPolicyDecision {
524 let deny_matches = self
525 .config
526 .commands
527 .deny
528 .iter()
529 .map(String::as_str)
530 .map(str::trim)
531 .filter(|entry| !entry.is_empty())
532 .any(|entry| entry == command);
533 if deny_matches {
534 return CommandPolicyDecision::DeniedByDenylist;
535 }
536
537 let mut allow_entries = self
538 .config
539 .commands
540 .allow
541 .iter()
542 .map(String::as_str)
543 .map(str::trim)
544 .filter(|entry| !entry.is_empty())
545 .peekable();
546
547 if allow_entries.peek().is_some() && !allow_entries.any(|entry| entry == command) {
548 return CommandPolicyDecision::DeniedByAllowlist;
549 }
550
551 CommandPolicyDecision::Allowed
552 }
553
554 pub fn is_command_allowed(&self, command: &str) -> bool {
555 self.command_policy_decision(command).is_allowed()
556 }
557
558 #[allow(clippy::allow_attributes, dead_code)]
567 pub async fn start() -> anyhow::Result<Arc<Self>> {
568 Ok(Arc::new(Self::start_impl().await?))
569 }
570
571 pub async fn start_lazy() -> anyhow::Result<Self> {
581 Self::start_lazy_with_config(None).await
582 }
583
584 pub async fn start_lazy_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
595 if managed_guard_enabled() {
596 anyhow::bail!(ORCHESTRATOR_MANAGED_GUARD_MESSAGE);
597 }
598
599 init_with_retry(|_attempt| {
600 let cfg = config_json.clone();
601 async move { Self::start_impl_with_config(cfg).await }
602 })
603 .await
604 }
605
606 async fn start_impl() -> anyhow::Result<Self> {
608 let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
609
610 let config = match agentic_config::loader::load_merged(&cwd) {
612 Ok(loaded) => {
613 for w in &loaded.warnings {
614 tracing::warn!("{w}");
615 }
616 loaded.config.orchestrator
617 }
618 Err(e) => {
619 tracing::warn!("Failed to load config, using defaults: {e}");
620 OrchestratorConfig::default()
621 }
622 };
623
624 let launcher_config = version::resolve_launcher_config(&cwd)
625 .context("Failed to resolve OpenCode launcher configuration")?;
626
627 tracing::info!(
628 binary = %launcher_config.binary,
629 launcher_args = ?launcher_config.launcher_args,
630 expected_version = %version::PINNED_OPENCODE_VERSION,
631 "starting embedded opencode serve (pinned stable)"
632 );
633
634 let opts = ServerOptions::default()
635 .binary(&launcher_config.binary)
636 .launcher_args(launcher_config.launcher_args)
637 .directory(cwd.clone());
638
639 let managed = ManagedServer::start(opts)
640 .await
641 .context("Failed to start embedded `opencode serve`")?;
642
643 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
645
646 let client = Client::builder()
647 .base_url(&base_url)
648 .directory(cwd.to_string_lossy().to_string())
649 .build()
650 .context("Failed to build opencode-rs HTTP client")?;
651
652 let health = client
653 .misc()
654 .health()
655 .await
656 .context("Failed to fetch /global/health for version validation")?;
657
658 version::validate_exact_version(health.version.as_deref()).with_context(|| {
659 format!(
660 "Embedded OpenCode server did not match pinned stable v{} (binary={})",
661 version::PINNED_OPENCODE_VERSION,
662 launcher_config.binary
663 )
664 })?;
665
666 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
668 tracing::warn!("Failed to load model limits: {}", e);
669 HashMap::new()
670 });
671
672 tracing::info!("Loaded {} model context limits", model_context_limits.len());
673
674 Ok(Self {
675 managed_server: StdMutex::new(Some(managed)),
676 client,
677 model_context_limits,
678 base_url,
679 config,
680 spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
681 })
682 }
683
684 async fn start_impl_with_config(config_json: Option<String>) -> anyhow::Result<Self> {
686 let cwd = std::env::current_dir().context("Failed to resolve current directory")?;
687
688 let config = match agentic_config::loader::load_merged(&cwd) {
690 Ok(loaded) => {
691 for w in &loaded.warnings {
692 tracing::warn!("{w}");
693 }
694 loaded.config.orchestrator
695 }
696 Err(e) => {
697 tracing::warn!("Failed to load config, using defaults: {e}");
698 OrchestratorConfig::default()
699 }
700 };
701
702 let launcher_config = version::resolve_launcher_config(&cwd)
703 .context("Failed to resolve OpenCode launcher configuration")?;
704
705 tracing::info!(
706 binary = %launcher_config.binary,
707 launcher_args = ?launcher_config.launcher_args,
708 expected_version = %version::PINNED_OPENCODE_VERSION,
709 config_injected = config_json.is_some(),
710 "starting embedded opencode serve (pinned stable)"
711 );
712
713 let mut opts = ServerOptions::default()
714 .binary(&launcher_config.binary)
715 .launcher_args(launcher_config.launcher_args)
716 .directory(cwd.clone());
717
718 if let Some(cfg) = config_json {
720 opts = opts.config_json(cfg);
721 }
722
723 let managed = ManagedServer::start(opts)
724 .await
725 .context("Failed to start embedded `opencode serve`")?;
726
727 let base_url = managed.url().to_string().trim_end_matches('/').to_string();
729
730 let client = Client::builder()
731 .base_url(&base_url)
732 .directory(cwd.to_string_lossy().to_string())
733 .build()
734 .context("Failed to build opencode-rs HTTP client")?;
735
736 let health = client
737 .misc()
738 .health()
739 .await
740 .context("Failed to fetch /global/health for version validation")?;
741
742 version::validate_exact_version(health.version.as_deref()).with_context(|| {
743 format!(
744 "Embedded OpenCode server did not match pinned stable v{} (binary={})",
745 version::PINNED_OPENCODE_VERSION,
746 launcher_config.binary
747 )
748 })?;
749
750 let model_context_limits = Self::load_model_limits(&client).await.unwrap_or_else(|e| {
752 tracing::warn!("Failed to load model limits: {}", e);
753 HashMap::new()
754 });
755
756 tracing::info!("Loaded {} model context limits", model_context_limits.len());
757
758 Ok(Self {
759 managed_server: StdMutex::new(Some(managed)),
760 client,
761 model_context_limits,
762 base_url,
763 config,
764 spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
765 })
766 }
767
768 pub fn client(&self) -> &Client {
770 &self.client
771 }
772
773 #[allow(clippy::allow_attributes, dead_code)]
775 pub fn base_url(&self) -> &str {
776 &self.base_url
777 }
778
779 pub fn context_limit(&self, provider_id: &str, model_id: &str) -> Option<u64> {
781 self.model_context_limits
782 .get(&(provider_id.to_string(), model_id.to_string()))
783 .copied()
784 }
785
786 pub fn session_deadline(&self) -> Duration {
788 Duration::from_secs(self.config.session_deadline_secs)
789 }
790
791 pub fn inactivity_timeout(&self) -> Duration {
793 Duration::from_secs(self.config.inactivity_timeout_secs)
794 }
795
796 pub fn compaction_threshold(&self) -> f64 {
798 self.config.compaction_threshold
799 }
800
801 pub fn spawned_sessions(&self) -> &Arc<RwLock<HashSet<String>>> {
803 &self.spawned_sessions
804 }
805
806 fn managed_server_lock(&self) -> std::sync::MutexGuard<'_, Option<ManagedServer>> {
807 self.managed_server
808 .lock()
809 .unwrap_or_else(std::sync::PoisonError::into_inner)
810 }
811
812 fn is_managed(&self) -> bool {
813 self.managed_server_lock().is_some()
814 }
815
816 async fn validate_for_tool_entry(&self) -> anyhow::Result<ServerEntryState> {
817 self.validate_for_tool_entry_with_timeout(TOOL_ENTRY_HEALTH_PROBE_TIMEOUT)
818 .await
819 }
820
821 async fn validate_for_tool_entry_with_timeout(
822 &self,
823 health_probe_timeout: Duration,
824 ) -> anyhow::Result<ServerEntryState> {
825 if self.is_managed() {
826 let is_running = {
827 let mut managed = self.managed_server_lock();
828 managed
829 .as_mut()
830 .is_some_and(opencode_rs::server::ManagedServer::is_running)
831 };
832
833 if !is_running {
834 return Ok(ServerEntryState::NeedsRecovery {
835 reason: "managed child is no longer running".to_string(),
836 });
837 }
838 }
839
840 match tokio::time::timeout(health_probe_timeout, self.client.misc().health()).await {
841 Ok(Ok(health)) if health.healthy => Ok(ServerEntryState::Healthy),
842 Ok(Ok(_health)) => Ok(ServerEntryState::NeedsRecovery {
843 reason: "/global/health reported unhealthy".to_string(),
844 }),
845 Ok(Err(error)) => Ok(ServerEntryState::NeedsRecovery {
846 reason: format!("/global/health probe failed: {error}"),
847 }),
848 Err(_elapsed) => Ok(ServerEntryState::NeedsRecovery {
849 reason: format!("/global/health probe timed out after {health_probe_timeout:?}"),
850 }),
851 }
852 }
853
854 async fn load_model_limits(client: &Client) -> anyhow::Result<HashMap<ModelKey, u64>> {
856 let resp: ProviderListResponse = client.providers().list().await?;
857 let mut limits = HashMap::new();
858
859 for provider in resp.all {
860 for (model_id, model) in provider.models {
861 if let Some(limit) = model.limit.as_ref().and_then(|l| l.context) {
862 limits.insert((provider.id.clone(), model_id), limit);
863 }
864 }
865 }
866
867 Ok(limits)
868 }
869
870 pub fn extract_assistant_text(messages: &[Message]) -> Option<String> {
872 let assistant_msg = messages.iter().rev().find(|m| m.info.role == "assistant")?;
874
875 let text: String = assistant_msg
877 .parts
878 .iter()
879 .filter_map(|p| {
880 if let Part::Text { text, .. } = p {
881 Some(text.as_str())
882 } else {
883 None
884 }
885 })
886 .collect::<Vec<_>>()
887 .join("");
888
889 if text.trim().is_empty() {
890 None
891 } else {
892 Some(text)
893 }
894 }
895}
896
897#[cfg(any(test, feature = "test-support"))]
903#[allow(dead_code, clippy::allow_attributes)]
904impl OrchestratorServer {
905 pub fn from_client(
910 client: Client,
911 base_url: impl Into<String>,
912 mode: RecoveryMode,
913 ) -> Arc<Self> {
914 Arc::new(Self::from_client_unshared(client, base_url, mode))
915 }
916
917 pub fn from_client_with_config(
918 client: Client,
919 base_url: impl Into<String>,
920 mode: RecoveryMode,
921 config: OrchestratorConfig,
922 ) -> Arc<Self> {
923 Arc::new(Self::from_client_unshared_with_config(
924 client, base_url, mode, config,
925 ))
926 }
927
928 pub fn from_client_unshared(
932 client: Client,
933 base_url: impl Into<String>,
934 mode: RecoveryMode,
935 ) -> Self {
936 Self::from_client_unshared_with_config(
937 client,
938 base_url,
939 mode,
940 OrchestratorConfig::default(),
941 )
942 }
943
944 pub fn from_client_unshared_with_config(
945 client: Client,
946 base_url: impl Into<String>,
947 _mode: RecoveryMode,
948 config: OrchestratorConfig,
949 ) -> Self {
950 Self {
951 managed_server: StdMutex::new(None),
952 client,
953 model_context_limits: HashMap::new(),
954 base_url: base_url.into().trim_end_matches('/').to_string(),
955 config,
956 spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
957 }
958 }
959
960 pub fn from_managed_for_testing(
961 managed: ManagedServer,
962 client: Client,
963 base_url: impl Into<String>,
964 ) -> Self {
965 Self::from_managed_for_testing_with_config(
966 managed,
967 client,
968 base_url,
969 OrchestratorConfig::default(),
970 )
971 }
972
973 pub fn from_managed_for_testing_with_config(
974 managed: ManagedServer,
975 client: Client,
976 base_url: impl Into<String>,
977 config: OrchestratorConfig,
978 ) -> Self {
979 Self {
980 managed_server: StdMutex::new(Some(managed)),
981 client,
982 model_context_limits: HashMap::new(),
983 base_url: base_url.into().trim_end_matches('/').to_string(),
984 config,
985 spawned_sessions: Arc::new(RwLock::new(HashSet::new())),
986 }
987 }
988
989 pub async fn stop_managed_for_testing(&self) -> anyhow::Result<()> {
990 let managed = {
991 let mut guard = self.managed_server_lock();
992 guard.take()
993 };
994
995 match managed {
996 Some(managed) => managed.stop().await.map_err(Into::into),
997 None => anyhow::bail!("no managed server is attached to this snapshot"),
998 }
999 }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004 use super::*;
1005 use agentic_config::types::OrchestratorCommandsConfig;
1006 use serial_test::serial;
1007 use std::sync::Arc;
1008 use std::sync::atomic::AtomicBool;
1009 use std::sync::atomic::AtomicUsize;
1010 use std::sync::atomic::Ordering;
1011 use std::time::Duration;
1012 use std::time::Instant;
1013 use tokio::io::AsyncReadExt;
1014 use tokio::io::AsyncWriteExt;
1015 use tokio::net::TcpListener;
1016 use tokio::process::Command;
1017 use tokio::sync::Notify;
1018 use wiremock::Mock;
1019 use wiremock::MockServer;
1020 use wiremock::ResponseTemplate;
1021 use wiremock::matchers::method;
1022 use wiremock::matchers::path;
1023
1024 struct ManagedEnvGuard {
1025 previous: Option<std::ffi::OsString>,
1026 }
1027
1028 impl ManagedEnvGuard {
1029 fn new() -> Self {
1030 Self {
1031 previous: std::env::var_os(OPENCODE_ORCHESTRATOR_MANAGED_ENV),
1032 }
1033 }
1034 }
1035
1036 impl Drop for ManagedEnvGuard {
1037 fn drop(&mut self) {
1038 match &self.previous {
1039 Some(value) => unsafe {
1041 std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, value);
1042 },
1043 None => unsafe {
1045 std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV);
1046 },
1047 }
1048 }
1049 }
1050
1051 async fn health_mock_server() -> MockServer {
1052 let mock = MockServer::start().await;
1053 Mock::given(method("GET"))
1054 .and(path("/global/health"))
1055 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1056 "healthy": true,
1057 "version": version::PINNED_OPENCODE_VERSION,
1058 })))
1059 .mount(&mock)
1060 .await;
1061 mock
1062 }
1063
1064 fn test_client(base_url: &str) -> Client {
1065 opencode_rs::ClientBuilder::new()
1066 .base_url(base_url)
1067 .timeout_secs(5)
1068 .build()
1069 .unwrap()
1070 }
1071
1072 fn external_server(base_url: &str) -> OrchestratorServer {
1073 OrchestratorServer::from_client_unshared(
1074 test_client(base_url),
1075 base_url,
1076 RecoveryMode::External,
1077 )
1078 }
1079
1080 fn external_server_with_config(
1081 base_url: &str,
1082 config: OrchestratorConfig,
1083 ) -> OrchestratorServer {
1084 OrchestratorServer::from_client_unshared_with_config(
1085 test_client(base_url),
1086 base_url,
1087 RecoveryMode::External,
1088 config,
1089 )
1090 }
1091
1092 async fn exited_child() -> tokio::process::Child {
1093 let mut child = Command::new("sh").arg("-c").arg("exit 0").spawn().unwrap();
1094 let _status = child.wait().await.unwrap();
1095 child
1096 }
1097
1098 async fn managed_server_with_exited_child(base_url: &str) -> OrchestratorServer {
1099 let managed = ManagedServer::from_child_for_testing(exited_child().await, base_url, 9);
1100 OrchestratorServer::from_managed_for_testing(managed, test_client(base_url), base_url)
1101 }
1102
1103 #[test]
1104 fn command_policy_allows_all_when_allowlist_is_empty() {
1105 let server = external_server_with_config(
1106 "http://127.0.0.1:9",
1107 OrchestratorConfig {
1108 commands: OrchestratorCommandsConfig {
1109 allow: vec![],
1110 deny: vec!["blocked".into()],
1111 },
1112 ..OrchestratorConfig::default()
1113 },
1114 );
1115
1116 assert_eq!(
1117 server.command_policy_decision("plan"),
1118 CommandPolicyDecision::Allowed
1119 );
1120 assert_eq!(
1121 server.command_policy_decision("blocked"),
1122 CommandPolicyDecision::DeniedByDenylist
1123 );
1124 }
1125
1126 #[test]
1127 fn command_policy_trims_entries_and_deny_wins() {
1128 let server = external_server_with_config(
1129 "http://127.0.0.1:9",
1130 OrchestratorConfig {
1131 commands: OrchestratorCommandsConfig {
1132 allow: vec![" plan ".into()],
1133 deny: vec!["plan".into()],
1134 },
1135 ..OrchestratorConfig::default()
1136 },
1137 );
1138
1139 assert_eq!(
1140 server.command_policy_decision("plan"),
1141 CommandPolicyDecision::DeniedByDenylist
1142 );
1143 }
1144
1145 #[test]
1146 fn command_policy_matching_is_case_sensitive() {
1147 let server = external_server_with_config(
1148 "http://127.0.0.1:9",
1149 OrchestratorConfig {
1150 commands: OrchestratorCommandsConfig {
1151 allow: vec!["Plan".into()],
1152 deny: vec!["blocked".into()],
1153 },
1154 ..OrchestratorConfig::default()
1155 },
1156 );
1157
1158 assert_eq!(
1159 server.command_policy_decision("Plan"),
1160 CommandPolicyDecision::Allowed
1161 );
1162 assert_eq!(
1163 server.command_policy_decision("plan"),
1164 CommandPolicyDecision::DeniedByAllowlist
1165 );
1166 assert!(server.is_command_allowed("Plan"));
1167 assert!(!server.is_command_allowed("plan"));
1168 }
1169
1170 struct BlockingHealthServer {
1171 base_url: String,
1172 started_requests: Arc<AtomicUsize>,
1173 started_notify: Arc<Notify>,
1174 released: Arc<AtomicBool>,
1175 release_notify: Arc<Notify>,
1176 task: tokio::task::JoinHandle<()>,
1177 }
1178
1179 impl BlockingHealthServer {
1180 async fn start(expected_requests: usize) -> Self {
1181 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1182 let addr = listener.local_addr().unwrap();
1183 let started_requests = Arc::new(AtomicUsize::new(0));
1184 let started_notify = Arc::new(Notify::new());
1185 let released = Arc::new(AtomicBool::new(false));
1186 let release_notify = Arc::new(Notify::new());
1187 let body = format!(
1188 r#"{{"healthy":true,"version":"{}"}}"#,
1189 version::PINNED_OPENCODE_VERSION
1190 );
1191 let response = Arc::new(format!(
1192 "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
1193 body.len(),
1194 body
1195 ));
1196
1197 let task = tokio::spawn({
1198 let started_requests = Arc::clone(&started_requests);
1199 let started_notify = Arc::clone(&started_notify);
1200 let released = Arc::clone(&released);
1201 let release_notify = Arc::clone(&release_notify);
1202 let response = Arc::clone(&response);
1203
1204 async move {
1205 let mut connections = Vec::with_capacity(expected_requests);
1206
1207 for _ in 0..expected_requests {
1208 let (mut stream, _addr) = listener.accept().await.unwrap();
1209 let started_requests = Arc::clone(&started_requests);
1210 let started_notify = Arc::clone(&started_notify);
1211 let released = Arc::clone(&released);
1212 let release_notify = Arc::clone(&release_notify);
1213 let response = Arc::clone(&response);
1214
1215 connections.push(tokio::spawn(async move {
1216 let mut request = [0_u8; 1024];
1217 let _read = stream.read(&mut request).await.unwrap();
1218 started_requests.fetch_add(1, Ordering::SeqCst);
1219 started_notify.notify_waiters();
1220
1221 loop {
1222 let notified = release_notify.notified();
1223 if released.load(Ordering::SeqCst) {
1224 break;
1225 }
1226 notified.await;
1227 }
1228
1229 stream.write_all(response.as_bytes()).await.unwrap();
1230 stream.shutdown().await.unwrap();
1231 }));
1232 }
1233
1234 for connection in connections {
1235 connection.await.unwrap();
1236 }
1237 }
1238 });
1239
1240 Self {
1241 base_url: format!("http://{addr}"),
1242 started_requests,
1243 started_notify,
1244 released,
1245 release_notify,
1246 task,
1247 }
1248 }
1249
1250 async fn wait_for_requests(&self, expected_requests: usize) {
1251 tokio::time::timeout(Duration::from_secs(1), async {
1252 while self.started_requests.load(Ordering::SeqCst) < expected_requests {
1253 self.started_notify.notified().await;
1254 }
1255 })
1256 .await
1257 .unwrap();
1258 }
1259
1260 fn release(&self) {
1261 self.released.store(true, Ordering::SeqCst);
1262 self.release_notify.notify_waiters();
1263 }
1264 }
1265
1266 impl Drop for BlockingHealthServer {
1267 fn drop(&mut self) {
1268 self.release();
1269 self.task.abort();
1270 }
1271 }
1272
1273 #[tokio::test]
1274 async fn init_with_retry_succeeds_on_first_attempt() {
1275 let attempts = AtomicUsize::new(0);
1276
1277 let result: u32 = init_with_retry(|_| {
1278 let n = attempts.fetch_add(1, Ordering::SeqCst);
1279 async move {
1280 assert_eq!(n, 0, "should only be called once on success");
1282 Ok(42)
1283 }
1284 })
1285 .await
1286 .unwrap();
1287
1288 assert_eq!(result, 42);
1289 assert_eq!(attempts.load(Ordering::SeqCst), 1);
1290 }
1291
1292 #[tokio::test]
1293 async fn init_with_retry_retries_once_and_succeeds() {
1294 let attempts = AtomicUsize::new(0);
1295
1296 let result: u32 = init_with_retry(|_| {
1297 let n = attempts.fetch_add(1, Ordering::SeqCst);
1298 async move {
1299 if n == 0 {
1300 anyhow::bail!("fail first");
1301 }
1302 Ok(42)
1303 }
1304 })
1305 .await
1306 .unwrap();
1307
1308 assert_eq!(result, 42);
1309 assert_eq!(attempts.load(Ordering::SeqCst), 2);
1310 }
1311
1312 #[tokio::test]
1313 async fn init_with_retry_fails_after_two_attempts() {
1314 let attempts = AtomicUsize::new(0);
1315
1316 let err = init_with_retry::<(), _, _>(|_| {
1317 attempts.fetch_add(1, Ordering::SeqCst);
1318 async { anyhow::bail!("always fail") }
1319 })
1320 .await
1321 .unwrap_err();
1322
1323 assert!(err.to_string().contains("always fail"));
1324 assert_eq!(attempts.load(Ordering::SeqCst), 2);
1325 }
1326
1327 #[tokio::test]
1328 async fn handle_serializes_initialization_and_reuses_snapshot() {
1329 let mock = health_mock_server().await;
1330 let base_url = mock.uri();
1331 let handle = Arc::new(OrchestratorServerHandle::new());
1332 let starts = Arc::new(AtomicUsize::new(0));
1333
1334 let first = {
1335 let handle = Arc::clone(&handle);
1336 let starts = Arc::clone(&starts);
1337 let base_url = base_url.clone();
1338 tokio::spawn(async move {
1339 handle
1340 .get_or_recover_with(|| {
1341 let starts = Arc::clone(&starts);
1342 let base_url = base_url.clone();
1343 async move {
1344 starts.fetch_add(1, Ordering::SeqCst);
1345 tokio::time::sleep(Duration::from_millis(50)).await;
1346 Ok(external_server(&base_url))
1347 }
1348 })
1349 .await
1350 })
1351 };
1352
1353 let second = {
1354 let handle = Arc::clone(&handle);
1355 let starts = Arc::clone(&starts);
1356 let base_url = base_url.clone();
1357 tokio::spawn(async move {
1358 handle
1359 .get_or_recover_with(|| {
1360 let starts = Arc::clone(&starts);
1361 let base_url = base_url.clone();
1362 async move {
1363 starts.fetch_add(1, Ordering::SeqCst);
1364 Ok(external_server(&base_url))
1365 }
1366 })
1367 .await
1368 })
1369 };
1370
1371 let first = first.await.unwrap().unwrap();
1372 let second = second.await.unwrap().unwrap();
1373
1374 assert_eq!(starts.load(Ordering::SeqCst), 1);
1375 assert!(Arc::ptr_eq(&first, &second));
1376 }
1377
1378 #[tokio::test]
1379 async fn validate_for_tool_entry_uses_health_for_external_server() {
1380 let mock = health_mock_server().await;
1381 let server = external_server(&mock.uri());
1382
1383 let state = server.validate_for_tool_entry().await.unwrap();
1384
1385 assert_eq!(state, ServerEntryState::Healthy);
1386 let requests = mock.received_requests().await.unwrap();
1387 assert!(
1388 requests
1389 .iter()
1390 .any(|request| request.url.path() == "/global/health"),
1391 "expected /global/health request"
1392 );
1393 }
1394
1395 #[tokio::test]
1396 async fn validate_for_tool_entry_times_out_health_probe() {
1397 let mock = MockServer::start().await;
1398 Mock::given(method("GET"))
1399 .and(path("/global/health"))
1400 .respond_with(
1401 ResponseTemplate::new(200)
1402 .set_delay(Duration::from_secs(30))
1403 .set_body_json(serde_json::json!({
1404 "healthy": true,
1405 "version": version::PINNED_OPENCODE_VERSION,
1406 })),
1407 )
1408 .mount(&mock)
1409 .await;
1410 let server = external_server(&mock.uri());
1411
1412 let state = server
1413 .validate_for_tool_entry_with_timeout(Duration::from_millis(25))
1414 .await
1415 .unwrap();
1416
1417 assert_eq!(
1418 state,
1419 ServerEntryState::NeedsRecovery {
1420 reason: "/global/health probe timed out after 25ms".to_string(),
1421 }
1422 );
1423 }
1424
1425 #[tokio::test]
1426 async fn validate_for_tool_entry_short_circuits_dead_managed_server() {
1427 let server = managed_server_with_exited_child("http://127.0.0.1:9").await;
1428
1429 let state = server.validate_for_tool_entry().await.unwrap();
1430
1431 assert_eq!(
1432 state,
1433 ServerEntryState::NeedsRecovery {
1434 reason: "managed child is no longer running".to_string(),
1435 }
1436 );
1437 }
1438
1439 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1440 async fn handle_allows_concurrent_healthy_acquires_without_serializing_validation() {
1441 let health = BlockingHealthServer::start(3).await;
1442 let handle = Arc::new(OrchestratorServerHandle::from_server_unshared(
1443 external_server(&health.base_url),
1444 ));
1445
1446 let started_at = Instant::now();
1447 let tasks = (0..3)
1448 .map(|_| {
1449 let handle = Arc::clone(&handle);
1450 tokio::spawn(async move { handle.acquire().await })
1451 })
1452 .collect::<Vec<_>>();
1453
1454 health.wait_for_requests(3).await;
1455 tokio::time::sleep(Duration::from_millis(75)).await;
1456 health.release();
1457
1458 let mut snapshots = Vec::with_capacity(tasks.len());
1459 for task in tasks {
1460 snapshots.push(task.await.unwrap().unwrap());
1461 }
1462
1463 assert!(
1464 started_at.elapsed() < Duration::from_millis(250),
1465 "healthy acquires should overlap rather than serialize"
1466 );
1467 assert!(Arc::ptr_eq(&snapshots[0], &snapshots[1]));
1468 assert!(Arc::ptr_eq(&snapshots[1], &snapshots[2]));
1469 }
1470
1471 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1472 async fn handle_single_flights_concurrent_stale_acquires() {
1473 let stale = Arc::new(managed_server_with_exited_child("http://127.0.0.1:9").await);
1474 let handle = Arc::new(OrchestratorServerHandle {
1475 state: AsyncMutex::new(HandleState::Ready {
1476 snapshot: Arc::clone(&stale),
1477 mode: RecoveryMode::Managed,
1478 }),
1479 });
1480 let mock = health_mock_server().await;
1481 let base_url = mock.uri();
1482 let starts = Arc::new(AtomicUsize::new(0));
1483
1484 let tasks = (0..3)
1485 .map(|_| {
1486 let handle = Arc::clone(&handle);
1487 let starts = Arc::clone(&starts);
1488 let base_url = base_url.clone();
1489 tokio::spawn(async move {
1490 handle
1491 .get_or_recover_with(|| {
1492 let starts = Arc::clone(&starts);
1493 let base_url = base_url.clone();
1494 async move {
1495 starts.fetch_add(1, Ordering::SeqCst);
1496 tokio::time::sleep(Duration::from_millis(50)).await;
1497 Ok(external_server(&base_url))
1498 }
1499 })
1500 .await
1501 })
1502 })
1503 .collect::<Vec<_>>();
1504
1505 let mut snapshots = Vec::with_capacity(tasks.len());
1506 for task in tasks {
1507 snapshots.push(task.await.unwrap().unwrap());
1508 }
1509
1510 assert_eq!(starts.load(Ordering::SeqCst), 1);
1511 assert!(!Arc::ptr_eq(&stale, &snapshots[0]));
1512 assert!(Arc::ptr_eq(&snapshots[0], &snapshots[1]));
1513 assert!(Arc::ptr_eq(&snapshots[1], &snapshots[2]));
1514 }
1515
1516 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1517 async fn handle_retries_if_cache_changes_while_validating() {
1518 let old_health = BlockingHealthServer::start(1).await;
1519 let original = Arc::new(external_server(&old_health.base_url));
1520 let handle = Arc::new(OrchestratorServerHandle {
1521 state: AsyncMutex::new(HandleState::Ready {
1522 snapshot: Arc::clone(&original),
1523 mode: RecoveryMode::External,
1524 }),
1525 });
1526 let replacement_mock = health_mock_server().await;
1527 let replacement = Arc::new(external_server(&replacement_mock.uri()));
1528
1529 let acquire = {
1530 let handle = Arc::clone(&handle);
1531 tokio::spawn(async move {
1532 handle
1533 .acquire_or_recover_with(|| async { anyhow::bail!("should not rebuild") })
1534 .await
1535 })
1536 };
1537
1538 old_health.wait_for_requests(1).await;
1539
1540 {
1541 let mut state = tokio::time::timeout(Duration::from_millis(100), handle.state.lock())
1542 .await
1543 .expect("validation should not hold the handle mutex");
1544 *state = HandleState::Ready {
1545 snapshot: Arc::clone(&replacement),
1546 mode: RecoveryMode::External,
1547 };
1548 }
1549
1550 old_health.release();
1551
1552 let snapshot = acquire.await.unwrap().unwrap();
1553
1554 assert!(!Arc::ptr_eq(&snapshot, &original));
1555 assert!(Arc::ptr_eq(&snapshot, &replacement));
1556 }
1557
1558 #[tokio::test]
1559 async fn handle_rebuilds_without_invalidating_held_snapshot() {
1560 let stale = Arc::new(managed_server_with_exited_child("http://127.0.0.1:9").await);
1561 let handle = OrchestratorServerHandle {
1562 state: AsyncMutex::new(HandleState::Ready {
1563 snapshot: Arc::clone(&stale),
1564 mode: RecoveryMode::Managed,
1565 }),
1566 };
1567 let mock = health_mock_server().await;
1568 let base_url = mock.uri();
1569 let starts = Arc::new(AtomicUsize::new(0));
1570
1571 let rebuilt = handle
1572 .get_or_recover_with(|| {
1573 let starts = Arc::clone(&starts);
1574 let base_url = base_url.clone();
1575 async move {
1576 starts.fetch_add(1, Ordering::SeqCst);
1577 Ok(external_server(&base_url))
1578 }
1579 })
1580 .await
1581 .unwrap();
1582
1583 assert_eq!(starts.load(Ordering::SeqCst), 1);
1584 assert!(!Arc::ptr_eq(&stale, &rebuilt));
1585 assert_eq!(stale.base_url(), "http://127.0.0.1:9");
1586 assert_eq!(rebuilt.base_url(), base_url.trim_end_matches('/'));
1587 }
1588
1589 #[test]
1590 #[serial(env)]
1591 fn managed_guard_disabled_when_env_not_set() {
1592 let _env = ManagedEnvGuard::new();
1593 unsafe { std::env::remove_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV) };
1596 assert!(!managed_guard_enabled());
1597 }
1598
1599 #[test]
1600 #[serial(env)]
1601 fn managed_guard_enabled_when_env_is_1() {
1602 let _env = ManagedEnvGuard::new();
1603 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
1605 assert!(managed_guard_enabled());
1606 }
1607
1608 #[test]
1609 #[serial(env)]
1610 fn managed_guard_disabled_when_env_is_0() {
1611 let _env = ManagedEnvGuard::new();
1612 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "0") };
1614 assert!(!managed_guard_enabled());
1615 }
1616
1617 #[test]
1618 #[serial(env)]
1619 fn managed_guard_disabled_when_env_is_empty() {
1620 let _env = ManagedEnvGuard::new();
1621 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "") };
1623 assert!(!managed_guard_enabled());
1624 }
1625
1626 #[test]
1627 #[serial(env)]
1628 fn managed_guard_disabled_when_env_is_whitespace() {
1629 let _env = ManagedEnvGuard::new();
1630 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, " ") };
1632 assert!(!managed_guard_enabled());
1633 }
1634
1635 #[test]
1636 #[serial(env)]
1637 fn managed_guard_enabled_when_env_is_truthy() {
1638 let _env = ManagedEnvGuard::new();
1639 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "true") };
1641 assert!(managed_guard_enabled());
1642 }
1643
1644 #[tokio::test]
1645 #[serial(env)]
1646 async fn recursion_guard_only_blocks_real_startup_paths() {
1647 let _env = ManagedEnvGuard::new();
1648 unsafe { std::env::set_var(OPENCODE_ORCHESTRATOR_MANAGED_ENV, "1") };
1650
1651 let mock = health_mock_server().await;
1652 let handle = OrchestratorServerHandle::from_server_unshared(external_server(&mock.uri()));
1653 let reused = handle
1654 .get_or_recover_with(|| async { anyhow::bail!("should not start") })
1655 .await
1656 .unwrap();
1657 assert_eq!(reused.base_url(), mock.uri().trim_end_matches('/'));
1658
1659 let fresh_handle = OrchestratorServerHandle::new();
1660 let err = match fresh_handle.acquire().await {
1661 Ok(_server) => panic!("expected recursion guard to block fresh startup"),
1662 Err(error) => error,
1663 };
1664 assert!(err.to_string().contains(ORCHESTRATOR_MANAGED_GUARD_MESSAGE));
1665 }
1666
1667 #[tokio::test]
1668 async fn external_failure_becomes_sticky_and_typed() {
1669 let handle = OrchestratorServerHandle::from_server_unshared(
1670 OrchestratorServer::from_client_unshared(
1671 test_client("http://127.0.0.1:9"),
1672 "http://127.0.0.1:9",
1673 RecoveryMode::External,
1674 ),
1675 );
1676 let starts = AtomicUsize::new(0);
1677
1678 let first = handle
1679 .acquire_or_recover_with(|| {
1680 starts.fetch_add(1, Ordering::SeqCst);
1681 async { anyhow::bail!("should not rebuild external servers") }
1682 })
1683 .await;
1684 let second = handle
1685 .acquire_or_recover_with(|| {
1686 starts.fetch_add(1, Ordering::SeqCst);
1687 async { anyhow::bail!("should not rebuild external servers") }
1688 })
1689 .await;
1690
1691 let first = match first {
1692 Ok(_snapshot) => panic!("expected typed external failure on first acquire"),
1693 Err(error) => error,
1694 };
1695 let second = match second {
1696 Ok(_snapshot) => panic!("expected sticky external failure on second acquire"),
1697 Err(error) => error,
1698 };
1699
1700 assert_eq!(starts.load(Ordering::SeqCst), 0);
1701 assert!(
1702 first
1703 .to_string()
1704 .contains("External OpenCode server unavailable"),
1705 "expected typed external failure, got: {first}"
1706 );
1707 assert_eq!(first.to_string(), second.to_string());
1708 }
1709}