use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
use super::command_policy::{
swap_command_policy_hook_depth, swap_command_policy_stack, CommandPolicy,
};
use super::policy::{
swap_approval_policy_stack, swap_execution_policy_stack, swap_trusted_bridge_depth,
CapabilityPolicy, ToolApprovalPolicy,
};
use super::{swap_mutation_session, MutationSessionRecord, RunExecutionRecord};
use crate::autonomy::{swap_autonomy_policy_stack, AutonomyPolicy};
use crate::connectors::harn_module::swap_active_harn_connector_ctx;
use crate::connectors::ConnectorCtx;
use crate::llm::permissions::{swap_dynamic_permission_stack, DynamicPermissionPolicy};
use crate::runtime_context::{swap_runtime_context_overlay_stack, RuntimeContextOverlay};
use crate::stdlib::process::{swap_source_dir, swap_thread_execution_context};
use crate::stdlib::template::llm_context::{swap_llm_render_stack, LlmRenderContext};
#[derive(Default, Clone)]
pub(crate) struct AmbientExecutionScope {
execution: Vec<CapabilityPolicy>,
approval: Vec<ToolApprovalPolicy>,
command: Vec<CommandPolicy>,
permissions: Vec<DynamicPermissionPolicy>,
runtime_context: Vec<RuntimeContextOverlay>,
autonomy: Vec<AutonomyPolicy>,
llm_render: Vec<LlmRenderContext>,
connector_ctx: Vec<ConnectorCtx>,
execution_context: Option<RunExecutionRecord>,
source_dir: Option<PathBuf>,
mutation_session: Option<MutationSessionRecord>,
trusted_depth: usize,
command_hook_depth: usize,
}
fn clone_via_swap<T: Clone + Default>(swap: impl Fn(T) -> T) -> T {
let owned = swap(T::default());
let cloned = owned.clone();
let _ = swap(owned);
cloned
}
impl AmbientExecutionScope {
pub(crate) fn capture_inherited() -> Self {
Self {
command: clone_via_swap(swap_command_policy_stack),
permissions: clone_via_swap(swap_dynamic_permission_stack),
runtime_context: clone_via_swap(swap_runtime_context_overlay_stack),
autonomy: clone_via_swap(swap_autonomy_policy_stack),
execution_context: clone_via_swap(swap_thread_execution_context),
source_dir: clone_via_swap(swap_source_dir),
mutation_session: clone_via_swap(swap_mutation_session),
..Self::default()
}
}
fn swap_in(self) -> Self {
Self {
execution: swap_execution_policy_stack(self.execution),
approval: swap_approval_policy_stack(self.approval),
command: swap_command_policy_stack(self.command),
permissions: swap_dynamic_permission_stack(self.permissions),
runtime_context: swap_runtime_context_overlay_stack(self.runtime_context),
autonomy: swap_autonomy_policy_stack(self.autonomy),
llm_render: swap_llm_render_stack(self.llm_render),
connector_ctx: swap_active_harn_connector_ctx(self.connector_ctx),
execution_context: swap_thread_execution_context(self.execution_context),
source_dir: swap_source_dir(self.source_dir),
mutation_session: swap_mutation_session(self.mutation_session),
trusted_depth: swap_trusted_bridge_depth(self.trusted_depth),
command_hook_depth: swap_command_policy_hook_depth(self.command_hook_depth),
}
}
}
#[cfg(test)]
#[derive(Clone, Copy)]
enum AmbientScoping {
Captured,
Uncaptured(&'static str),
}
#[cfg(test)]
const AMBIENT_THREAD_LOCAL_CATALOG: &[(&str, AmbientScoping)] = &[
("EXECUTION_POLICY_STACK", AmbientScoping::Captured),
("EXECUTION_APPROVAL_POLICY_STACK", AmbientScoping::Captured),
("COMMAND_POLICY_STACK", AmbientScoping::Captured),
("DYNAMIC_PERMISSION_STACK", AmbientScoping::Captured),
("RUNTIME_CONTEXT_OVERLAY_STACK", AmbientScoping::Captured),
("AUTONOMY_POLICY_STACK", AmbientScoping::Captured),
("LLM_RENDER_STACK", AmbientScoping::Captured),
("ACTIVE_HARN_CONNECTOR_CTX", AmbientScoping::Captured),
("TRUSTED_BRIDGE_CALL_DEPTH", AmbientScoping::Captured),
("COMMAND_POLICY_HOOK_DEPTH", AmbientScoping::Captured),
("VM_EXECUTION_CONTEXT", AmbientScoping::Captured),
("VM_SOURCE_DIR", AmbientScoping::Captured),
("CURRENT_MUTATION_SESSION", AmbientScoping::Captured),
(
"SECURITY_POLICY_STACK",
AmbientScoping::Uncaptured(
"[latent-capability] security/mod.rs MCP-schema/security policy; not \
set per-worker today. Capture when a fan-out child reads it across an await.",
),
),
(
"ACTIVE_TENANT_STACK",
AmbientScoping::Uncaptured(
"[latent-capability] harness_tenant.rs tenant identity; not set per-worker today.",
),
),
(
"ACTIVE_PRINCIPAL_STACK",
AmbientScoping::Uncaptured(
"[latent-capability] harness_auth.rs principal identity; not set per-worker today.",
),
),
(
"REQUIRE_EXPLICIT_EGRESS_POLICY_DEPTH",
AmbientScoping::Uncaptured(
"[latent-capability] egress/mod.rs egress-policy enforcement depth; not entered \
per-worker today.",
),
),
(
"REQUIRE_SSRF_GUARD_DEPTH",
AmbientScoping::Uncaptured(
"[latent-capability] egress/mod.rs SSRF-guard depth; not entered per-worker today.",
),
),
(
"REDACTION_POLICY_STACK",
AmbientScoping::Uncaptured(
"[latent-capability] redact/mod.rs redaction policy; pushed around synchronous \
redaction, not held across a child await today.",
),
),
(
"ACTIVE_REQUEST_ID_STACK",
AmbientScoping::Uncaptured(
"[latent-capability] observability/request_id.rs request-id breadcrumb; attribution \
only, no capability decision rides on it.",
),
),
(
"PERSONA_STACK",
AmbientScoping::Uncaptured(
"step_runtime.rs snapshots+restores this at the worker boundary (own isolation \
path); not read raw across a fan-out child await.",
),
),
(
"STEP_STACK",
AmbientScoping::Uncaptured(
"step_runtime.rs snapshots+restores this at the worker boundary (own isolation \
path); not read raw across a fan-out child await.",
),
),
(
"CURRENT_SESSION_STACK",
AmbientScoping::Uncaptured(
"agent_sessions.rs session breadcrumb; each worker opens its own session at \
startup. Audited 2026-06-28: not read across a child await.",
),
),
(
"CURRENT_TOOL_CALL_STACK",
AmbientScoping::Uncaptured(
"agent_sessions.rs tool-call breadcrumb; pushed+popped within a single synchronous \
dispatch frame.",
),
),
(
"TRANSCRIPT_DIR_STACK",
AmbientScoping::Uncaptured(
"llm/agent_observe.rs transcript output dir; push/pop balanced within an observe \
scope.",
),
),
(
"VM_TRACE_STACK",
AmbientScoping::Uncaptured(
"stdlib/logging.rs log-trace breadcrumb (trace ids for log lines); attribution \
only, no capability decision rides on it.",
),
),
(
"ACTIVE_DISPATCH_CONTEXT",
AmbientScoping::Uncaptured(
"triggers/dispatcher trigger-dispatch context for the dispatcher runner, not the \
fan-out worker path.",
),
),
(
"CURRENT_WORKFLOW_SKILL_CONTEXT",
AmbientScoping::Uncaptured(
"orchestration/mod.rs workflow skill context; the workflow runner pins itself to \
one LocalSet task, so every stage observes the same context (see its doc-comment).",
),
),
];
#[cfg(test)]
const AUDITED_LATENT_CAPABILITIES: &[&str] = &[
"SECURITY_POLICY_STACK",
"ACTIVE_TENANT_STACK",
"ACTIVE_PRINCIPAL_STACK",
"REQUIRE_EXPLICIT_EGRESS_POLICY_DEPTH",
"REQUIRE_SSRF_GUARD_DEPTH",
];
pin_project! {
pub(crate) struct Scoped<F> {
#[pin]
inner: F,
scope: Option<AmbientExecutionScope>,
}
}
pub(crate) fn scope_ambient<F: Future>(scope: AmbientExecutionScope, inner: F) -> Scoped<F> {
Scoped {
inner,
scope: Some(scope),
}
}
struct RestoreGuard<'a> {
outer: Option<AmbientExecutionScope>,
slot: &'a mut Option<AmbientExecutionScope>,
}
impl Drop for RestoreGuard<'_> {
fn drop(&mut self) {
if let Some(outer) = self.outer.take() {
*self.slot = Some(outer.swap_in());
}
}
}
impl<F: Future> Future for Scoped<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
let this = self.project();
let task_scope = this.scope.take().unwrap_or_default();
let outer = task_scope.swap_in();
let _restore = RestoreGuard {
outer: Some(outer),
slot: this.scope,
};
this.inner.poll(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::orchestration::{current_execution_policy, push_execution_policy};
fn policy_named(tool: &str) -> CapabilityPolicy {
CapabilityPolicy {
tools: vec![tool.to_string()],
..Default::default()
}
}
#[tokio::test]
async fn scoped_tasks_do_not_cross_wire_execution_policy() {
let local = tokio::task::LocalSet::new();
local
.run_until(async {
let alpha = tokio::task::spawn_local(scope_ambient(
AmbientExecutionScope::default(),
async {
push_execution_policy(policy_named("alpha"));
tokio::task::yield_now().await;
tokio::task::yield_now().await;
current_execution_policy().map(|p| p.tools)
},
));
let beta = tokio::task::spawn_local(scope_ambient(
AmbientExecutionScope::default(),
async {
push_execution_policy(policy_named("beta"));
tokio::task::yield_now().await;
tokio::task::yield_now().await;
current_execution_policy().map(|p| p.tools)
},
));
assert_eq!(alpha.await.unwrap(), Some(vec!["alpha".to_string()]));
assert_eq!(beta.await.unwrap(), Some(vec!["beta".to_string()]));
})
.await;
assert!(current_execution_policy().is_none());
}
#[tokio::test]
async fn scope_is_restored_after_completion() {
let local = tokio::task::LocalSet::new();
local
.run_until(async {
tokio::task::spawn_local(scope_ambient(AmbientExecutionScope::default(), async {
push_execution_policy(policy_named("gamma"));
tokio::task::yield_now().await;
}))
.await
.unwrap();
})
.await;
assert!(current_execution_policy().is_none());
}
fn execution_context_named(name: &str) -> RunExecutionRecord {
let mut env = std::collections::BTreeMap::new();
env.insert("WORKER".to_string(), name.to_string());
RunExecutionRecord {
cwd: Some(format!("/worktrees/{name}")),
env,
..Default::default()
}
}
fn mutation_session_named(name: &str) -> MutationSessionRecord {
MutationSessionRecord {
session_id: format!("session-{name}"),
run_id: Some(format!("run-{name}")),
..Default::default()
}
}
#[tokio::test]
async fn scoped_tasks_do_not_cross_wire_execution_context() {
use crate::stdlib::process::{current_execution_context, set_thread_execution_context};
let local = tokio::task::LocalSet::new();
local
.run_until(async {
let alpha = tokio::task::spawn_local(scope_ambient(
AmbientExecutionScope::default(),
async {
set_thread_execution_context(Some(execution_context_named("alpha")));
tokio::task::yield_now().await;
tokio::task::yield_now().await;
current_execution_context()
.map(|ctx| (ctx.cwd, ctx.env.get("WORKER").cloned()))
},
));
let beta = tokio::task::spawn_local(scope_ambient(
AmbientExecutionScope::default(),
async {
set_thread_execution_context(Some(execution_context_named("beta")));
tokio::task::yield_now().await;
tokio::task::yield_now().await;
current_execution_context()
.map(|ctx| (ctx.cwd, ctx.env.get("WORKER").cloned()))
},
));
assert_eq!(
alpha.await.unwrap(),
Some((
Some("/worktrees/alpha".to_string()),
Some("alpha".to_string())
))
);
assert_eq!(
beta.await.unwrap(),
Some((
Some("/worktrees/beta".to_string()),
Some("beta".to_string())
))
);
})
.await;
assert!(crate::stdlib::process::current_execution_context().is_none());
}
#[tokio::test]
async fn scoped_tasks_do_not_cross_wire_mutation_session() {
use crate::orchestration::{current_mutation_session, install_current_mutation_session};
let local = tokio::task::LocalSet::new();
local
.run_until(async {
let alpha = tokio::task::spawn_local(scope_ambient(
AmbientExecutionScope::default(),
async {
install_current_mutation_session(Some(mutation_session_named("alpha")));
tokio::task::yield_now().await;
tokio::task::yield_now().await;
current_mutation_session().map(|s| (s.session_id, s.run_id))
},
));
let beta = tokio::task::spawn_local(scope_ambient(
AmbientExecutionScope::default(),
async {
install_current_mutation_session(Some(mutation_session_named("beta")));
tokio::task::yield_now().await;
tokio::task::yield_now().await;
current_mutation_session().map(|s| (s.session_id, s.run_id))
},
));
assert_eq!(
alpha.await.unwrap(),
Some(("session-alpha".to_string(), Some("run-alpha".to_string())))
);
assert_eq!(
beta.await.unwrap(),
Some(("session-beta".to_string(), Some("run-beta".to_string())))
);
})
.await;
assert!(crate::orchestration::current_mutation_session().is_none());
}
#[test]
fn drift_every_ambient_shape_thread_local_is_cataloged() {
use std::collections::BTreeSet;
fn is_ambient_shape(name: &str) -> bool {
name == "VM_SOURCE_DIR"
|| name.ends_with("_STACK")
|| name.ends_with("_DEPTH")
|| name.ends_with("_CONTEXT")
|| name.ends_with("_SESSION")
|| name.ends_with("_CTX")
}
fn collect(dir: &std::path::Path, out: &mut BTreeSet<String>) {
for entry in std::fs::read_dir(dir).expect("read_dir src") {
let path = entry.expect("dir entry").path();
if path.to_string_lossy().contains("test") {
continue;
}
if path.is_dir() {
collect(&path, out);
} else if path.extension().and_then(|e| e.to_str()) == Some("rs") {
let content = std::fs::read_to_string(&path).expect("read src file");
for line in content.lines() {
if !line.contains("RefCell") {
continue;
}
let Some(idx) = line.find("static ") else {
continue;
};
let after = &line[idx + "static ".len()..];
let name: String = after
.chars()
.take_while(|c| c.is_ascii_alphanumeric() || *c == '_')
.collect();
if !name.is_empty() && is_ambient_shape(&name) {
out.insert(name);
}
}
}
}
}
let src = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src");
let mut discovered = BTreeSet::new();
collect(&src, &mut discovered);
let cataloged: BTreeSet<String> = AMBIENT_THREAD_LOCAL_CATALOG
.iter()
.map(|(name, _)| (*name).to_string())
.collect();
let missing: Vec<_> = discovered.difference(&cataloged).cloned().collect();
assert!(
missing.is_empty(),
"new ambient-shape thread-local(s) not classified in \
AMBIENT_THREAD_LOCAL_CATALOG (orchestration/ambient_scope.rs): {missing:?}. Decide \
whether each must be Captured into AmbientExecutionScope (it is held across a \
fan-out worker's awaits and would otherwise cross-wire siblings) or is safely \
Uncaptured, then add it to the catalog. This is the F1/F2 drift guard."
);
let stale: Vec<_> = cataloged.difference(&discovered).cloned().collect();
assert!(
stale.is_empty(),
"AMBIENT_THREAD_LOCAL_CATALOG names thread-local(s) no longer in src \
(renamed/removed?): {stale:?}. Update the catalog."
);
}
#[test]
fn captured_catalog_matches_scope_fields() {
use std::collections::BTreeSet;
let captured: BTreeSet<&str> = AMBIENT_THREAD_LOCAL_CATALOG
.iter()
.filter(|(_, scoping)| matches!(scoping, AmbientScoping::Captured))
.map(|(name, _)| *name)
.collect();
let expected: BTreeSet<&str> = [
"EXECUTION_POLICY_STACK",
"EXECUTION_APPROVAL_POLICY_STACK",
"COMMAND_POLICY_STACK",
"DYNAMIC_PERMISSION_STACK",
"RUNTIME_CONTEXT_OVERLAY_STACK",
"AUTONOMY_POLICY_STACK",
"LLM_RENDER_STACK",
"ACTIVE_HARN_CONNECTOR_CTX",
"TRUSTED_BRIDGE_CALL_DEPTH",
"COMMAND_POLICY_HOOK_DEPTH",
"VM_EXECUTION_CONTEXT",
"VM_SOURCE_DIR",
"CURRENT_MUTATION_SESSION",
]
.into_iter()
.collect();
assert_eq!(
captured, expected,
"the catalog's Captured set diverged from AmbientExecutionScope's swapped fields; \
keep the struct fields, swap_in, and the catalog in lockstep."
);
}
#[test]
fn audited_latent_capabilities_are_cataloged() {
for latent in AUDITED_LATENT_CAPABILITIES {
let found = AMBIENT_THREAD_LOCAL_CATALOG
.iter()
.find(|(name, _)| name == latent);
let Some((_, scoping)) = found else {
panic!("{latent} missing from AMBIENT_THREAD_LOCAL_CATALOG");
};
match scoping {
AmbientScoping::Uncaptured(reason) => assert!(
reason.contains("[latent-capability]"),
"{latent} must keep its [latent-capability] reason tag so the call-out stays visible"
),
AmbientScoping::Captured => panic!(
"{latent} is now Captured — wire it fully and drop it from \
AUDITED_LATENT_CAPABILITIES"
),
}
}
}
}