use crate::error::Error;
use crate::observability::ObservabilityConfig;
use crate::observability::metrics::MetricsProvider;
use crate::record::RegionLimits;
use crate::runtime::RuntimeState;
use crate::runtime::SpawnError;
use crate::runtime::config::RuntimeConfig;
use crate::runtime::deadline_monitor::{
AdaptiveDeadlineConfig, DeadlineTaskSnapshot, DeadlineWarning, MonitorConfig,
default_warning_handler,
};
use crate::runtime::io_driver::IoDriverHandle;
use crate::runtime::reactor::Reactor;
use crate::runtime::scheduler::{ThreeLaneScheduler, ThreeLaneWorker};
use crate::time::TimerDriverHandle;
use crate::trace::distributed::LogicalClockMode;
use crate::types::{Budget, CancelAttributionConfig};
use crate::util::EntropySource;
#[cfg(target_arch = "wasm32")]
use js_sys::{Reflect, global};
use parking_lot::{Mutex, MutexGuard};
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use thiserror::Error as ThisError;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::JsValue;
use crate::types::{
WasmAbiOutcomeEnvelope, WasmAbiVersion, WasmAbortPropagationMode, WasmDispatchError,
WasmDispatcherDiagnostics, WasmExportDispatcher, WasmHandleRef, WasmScopeEnterBuilder,
};
thread_local! {
static CURRENT_RUNTIME_HANDLE: RefCell<Option<RuntimeHandle>> = const { RefCell::new(None) };
}
struct ScopedRuntimeHandle {
prev: Option<RuntimeHandle>,
}
impl ScopedRuntimeHandle {
fn new(handle: RuntimeHandle) -> Self {
let prev = CURRENT_RUNTIME_HANDLE.with(|cell| cell.replace(Some(handle)));
Self { prev }
}
}
impl Drop for ScopedRuntimeHandle {
fn drop(&mut self) {
let prev = self.prev.take();
let _ = CURRENT_RUNTIME_HANDLE.try_with(|cell| {
*cell.borrow_mut() = prev;
});
}
}
#[allow(dead_code)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RuntimeHostServicesKind {
NativeStdThread,
}
#[allow(dead_code)] impl RuntimeHostServicesKind {
const fn as_str(self) -> &'static str {
match self {
Self::NativeStdThread => "native-std-thread",
}
}
}
#[allow(dead_code)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct BrowserHostServicesContract {
required_capabilities: &'static [&'static str],
}
#[allow(dead_code)] impl BrowserHostServicesContract {
const V1: Self = Self {
required_capabilities: &[
"host-turn wakeups",
"worker bootstrap hooks",
"timer/deadline driving",
"lane-health callbacks",
],
};
fn diagnostic_requirements(self) -> &'static str {
if self
.required_capabilities
.contains(&"lane-health callbacks")
{
"host-turn wakeups, worker bootstrap hooks, timer/deadline driving, and lane-health callbacks for threadless startup"
} else {
"browser host-services contract requirements"
}
}
}
struct DeadlineMonitorHostService {
shutdown: Option<Arc<std::sync::atomic::AtomicBool>>,
thread: Option<std::thread::JoinHandle<()>>,
}
impl DeadlineMonitorHostService {
const fn disabled() -> Self {
Self {
shutdown: None,
thread: None,
}
}
}
trait RuntimeHostServices: Send + Sync {
fn kind(&self) -> RuntimeHostServicesKind;
fn browser_contract(&self) -> BrowserHostServicesContract {
BrowserHostServicesContract::V1
}
fn spawn_workers(
&self,
runtime: &Arc<RuntimeInner>,
workers: Vec<ThreeLaneWorker>,
) -> io::Result<Vec<std::thread::JoinHandle<()>>>;
fn start_deadline_monitor(
&self,
config: &RuntimeConfig,
state: &Arc<crate::sync::ContendedMutex<RuntimeState>>,
) -> DeadlineMonitorHostService;
}
#[derive(Default)]
struct NativeThreadHostServices;
impl NativeThreadHostServices {
const fn new() -> Self {
Self
}
fn spawn_worker_threads(
runtime: &Arc<RuntimeInner>,
workers: Vec<ThreeLaneWorker>,
) -> io::Result<Vec<std::thread::JoinHandle<()>>> {
let mut worker_threads: Vec<std::thread::JoinHandle<()>> = Vec::new();
if runtime.config.worker_threads == 0 {
return Ok(worker_threads);
}
for worker in workers {
let name = {
let id = worker.id;
format!("{}-{id}", runtime.config.thread_name_prefix)
};
let runtime_handle = RuntimeHandle::weak(runtime);
let on_start = runtime.config.on_thread_start.clone();
let on_stop = runtime.config.on_thread_stop.clone();
let mut builder = std::thread::Builder::new().name(name);
if runtime.config.thread_stack_size > 0 {
builder = builder.stack_size(runtime.config.thread_stack_size);
}
let handle = builder
.spawn(move || {
let _guard = ScopedRuntimeHandle::new(runtime_handle);
if let Some(callback) = on_start.as_ref() {
callback();
}
let mut worker = worker;
worker.run_loop();
if let Some(callback) = on_stop.as_ref() {
callback();
}
})
.map_err(|e| {
runtime.scheduler.shutdown();
while let Some(handle) = worker_threads.pop() {
let _ = handle.join();
}
io::Error::other(format!("failed to spawn worker thread: {e}"))
})?;
worker_threads.push(handle);
}
Ok(worker_threads)
}
fn start_deadline_monitor(
config: &RuntimeConfig,
state: &Arc<crate::sync::ContendedMutex<RuntimeState>>,
) -> DeadlineMonitorHostService {
use crate::runtime::deadline_monitor::DeadlineMonitor;
use std::sync::atomic::AtomicBool;
let monitor_config = match config.deadline_monitor {
Some(ref mc) if mc.enabled => mc,
_ => return DeadlineMonitorHostService::disabled(),
};
let dm_shutdown = Arc::new(AtomicBool::new(false));
let dm_shutdown_clone = Arc::clone(&dm_shutdown);
let dm_state = Arc::clone(state);
let check_interval = monitor_config.check_interval;
let mut monitor = DeadlineMonitor::new(monitor_config.clone());
if let Some(ref handler) = config.deadline_warning_handler {
let handler = Arc::clone(handler);
monitor.on_warning(move |w| handler(w));
}
monitor.set_metrics_provider(Arc::clone(&config.metrics_provider));
let thread_name = format!("{}-deadline-monitor", config.thread_name_prefix);
let thread = std::thread::Builder::new()
.name(thread_name)
.spawn(move || {
while !dm_shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) {
std::thread::sleep(check_interval);
if dm_shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let guard = dm_state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let now = guard.now;
let tasks = guard
.tasks_iter()
.map(|(_, record)| DeadlineTaskSnapshot::from_task_record(record))
.collect::<Vec<_>>();
drop(guard);
monitor.check_snapshots(now, tasks);
}
})
.ok();
DeadlineMonitorHostService {
shutdown: Some(dm_shutdown),
thread,
}
}
}
impl RuntimeHostServices for NativeThreadHostServices {
fn kind(&self) -> RuntimeHostServicesKind {
RuntimeHostServicesKind::NativeStdThread
}
fn spawn_workers(
&self,
runtime: &Arc<RuntimeInner>,
workers: Vec<ThreeLaneWorker>,
) -> io::Result<Vec<std::thread::JoinHandle<()>>> {
Self::spawn_worker_threads(runtime, workers)
}
fn start_deadline_monitor(
&self,
config: &RuntimeConfig,
state: &Arc<crate::sync::ContendedMutex<RuntimeState>>,
) -> DeadlineMonitorHostService {
Self::start_deadline_monitor(config, state)
}
}
fn default_runtime_host_services() -> Arc<dyn RuntimeHostServices> {
Arc::new(NativeThreadHostServices::new())
}
#[allow(dead_code)] fn unsupported_browser_bootstrap_message(host_services: &dyn RuntimeHostServices) -> String {
let contract = host_services.browser_contract();
format!(
"RuntimeBuilder browser bootstrap is not yet supported on wasm browser profiles; \
startup now routes through the RuntimeHostServices seam, but this build still only \
ships the {} host implementation. A future browser host must provide {}. Use the \
Browser Edition JS/TS bindings or the repository-maintained browser fixtures until \
that browser host implementation lands.",
host_services.kind().as_str(),
contract.diagnostic_requirements(),
)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserExecutionApiCapabilities {
pub has_abort_controller: bool,
pub has_fetch: bool,
pub has_webassembly: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserDomCapabilities {
pub has_document: bool,
pub has_window: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserStorageCapabilities {
pub has_indexed_db: bool,
pub has_local_storage: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserTransportCapabilities {
pub has_web_socket: bool,
pub has_web_transport: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserCapabilitySnapshot {
pub execution_api: BrowserExecutionApiCapabilities,
pub dom: BrowserDomCapabilities,
pub storage: BrowserStorageCapabilities,
pub transport: BrowserTransportCapabilities,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserRuntimeSupportClass {
DirectRuntimeSupported,
Unsupported,
}
impl BrowserRuntimeSupportClass {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::DirectRuntimeSupported => "direct_runtime_supported",
Self::Unsupported => "unsupported",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserRuntimeContext {
BrowserMainThread,
DedicatedWorker,
ServiceWorker,
SharedWorker,
Unknown,
}
impl BrowserRuntimeContext {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::BrowserMainThread => "browser_main_thread",
Self::DedicatedWorker => "dedicated_worker",
Self::ServiceWorker => "service_worker",
Self::SharedWorker => "shared_worker",
Self::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserRuntimeSupportReason {
MissingGlobalThis,
ServiceWorkerNotYetShipped,
SharedWorkerNotYetShipped,
UnsupportedRuntimeContext,
MissingWebAssembly,
Supported,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BrowserRuntimeSupportDiagnostics {
pub supported: bool,
pub support_class: BrowserRuntimeSupportClass,
pub runtime_context: BrowserRuntimeContext,
pub reason: BrowserRuntimeSupportReason,
pub message: String,
pub guidance: Vec<String>,
pub capabilities: BrowserCapabilitySnapshot,
}
const BROWSER_SERVICE_WORKER_BROKER_CONTRACT_ID: &str = "wasm-service-worker-broker-contract-v1";
const BROWSER_SERVICE_WORKER_BROKER_LANE: &str = "lane.browser.service_worker.broker";
const BROWSER_SHARED_WORKER_COORDINATOR_CONTRACT_ID: &str =
"wasm-shared-worker-tenancy-lifecycle-v1";
const BROWSER_SHARED_WORKER_COORDINATOR_LANE: &str = "lane.browser.shared_worker.coordinator";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserWorkerFallbackTarget {
DedicatedWorkerDirectRuntime,
BrowserMainThreadDirectRuntime,
BridgeFallback,
}
impl BrowserWorkerFallbackTarget {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::DedicatedWorkerDirectRuntime => "lane.browser.dedicated_worker.direct_runtime",
Self::BrowserMainThreadDirectRuntime => "lane.browser.main_thread.direct_runtime",
Self::BridgeFallback => "bridge_fallback",
}
}
const fn fallback_lane_id(self) -> Option<BrowserExecutionLane> {
match self {
Self::DedicatedWorkerDirectRuntime => {
Some(BrowserExecutionLane::DedicatedWorkerDirectRuntime)
}
Self::BrowserMainThreadDirectRuntime => {
Some(BrowserExecutionLane::BrowserMainThreadDirectRuntime)
}
Self::BridgeFallback => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserServiceWorkerBrokerSupportReason {
Supported,
ServiceWorkerApiMissing,
DurableStoreUnavailableForRestartableProfile,
}
impl BrowserServiceWorkerBrokerSupportReason {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Supported => "supported",
Self::ServiceWorkerApiMissing => "service_worker_api_missing",
Self::DurableStoreUnavailableForRestartableProfile => {
"durable_store_unavailable_for_restartable_profile"
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BrowserServiceWorkerBrokerSupportDiagnostics {
pub supported: bool,
pub contract_id: &'static str,
pub requested_lane: &'static str,
pub fallback_target: BrowserWorkerFallbackTarget,
pub fallback_lane_id: Option<BrowserExecutionLane>,
pub downgrade_order: Vec<BrowserWorkerFallbackTarget>,
pub host_role: BrowserExecutionHostRole,
pub runtime_context: BrowserRuntimeContext,
pub reason: BrowserServiceWorkerBrokerSupportReason,
pub message: String,
pub guidance: Vec<String>,
pub direct_runtime_reason: BrowserRuntimeSupportReason,
pub direct_execution_reason_code: BrowserExecutionReasonCode,
pub runtime_support: BrowserRuntimeSupportDiagnostics,
pub capabilities: BrowserCapabilitySnapshot,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserSharedWorkerCoordinatorSupportReason {
Supported,
SharedWorkerApiMissing,
}
impl BrowserSharedWorkerCoordinatorSupportReason {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Supported => "supported",
Self::SharedWorkerApiMissing => "shared_worker_api_missing",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BrowserSharedWorkerCoordinatorSupportDiagnostics {
pub supported: bool,
pub contract_id: &'static str,
pub requested_lane: &'static str,
pub fallback_target: BrowserWorkerFallbackTarget,
pub fallback_lane_id: Option<BrowserExecutionLane>,
pub downgrade_order: Vec<BrowserWorkerFallbackTarget>,
pub host_role: BrowserExecutionHostRole,
pub runtime_context: BrowserRuntimeContext,
pub reason: BrowserSharedWorkerCoordinatorSupportReason,
pub message: String,
pub guidance: Vec<String>,
pub direct_runtime_reason: BrowserRuntimeSupportReason,
pub direct_execution_reason_code: BrowserExecutionReasonCode,
pub runtime_support: BrowserRuntimeSupportDiagnostics,
pub capabilities: BrowserCapabilitySnapshot,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserExecutionHostRole {
BrowserMainThread,
DedicatedWorker,
ServiceWorker,
SharedWorker,
NonBrowserOrUnknown,
}
impl BrowserExecutionHostRole {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::BrowserMainThread => "browser_main_thread",
Self::DedicatedWorker => "dedicated_worker",
Self::ServiceWorker => "service_worker",
Self::SharedWorker => "shared_worker",
Self::NonBrowserOrUnknown => "non_browser_or_unknown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserExecutionLane {
BrowserMainThreadDirectRuntime,
DedicatedWorkerDirectRuntime,
Unsupported,
}
impl BrowserExecutionLane {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::BrowserMainThreadDirectRuntime => "lane.browser.main_thread.direct_runtime",
Self::DedicatedWorkerDirectRuntime => "lane.browser.dedicated_worker.direct_runtime",
Self::Unsupported => "lane.unsupported",
}
}
const fn lane_kind(self) -> BrowserExecutionLaneKind {
match self {
Self::Unsupported => BrowserExecutionLaneKind::Unsupported,
Self::BrowserMainThreadDirectRuntime | Self::DedicatedWorkerDirectRuntime => {
BrowserExecutionLaneKind::DirectRuntime
}
}
}
const fn lane_rank(self) -> u16 {
match self {
Self::BrowserMainThreadDirectRuntime => 10,
Self::DedicatedWorkerDirectRuntime => 20,
Self::Unsupported => 99,
}
}
const fn fallback_lane(self) -> Option<Self> {
match self {
Self::Unsupported => None,
Self::BrowserMainThreadDirectRuntime | Self::DedicatedWorkerDirectRuntime => {
Some(Self::Unsupported)
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserExecutionLaneKind {
DirectRuntime,
Unsupported,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrowserExecutionReasonCode {
Supported,
CandidateHostRoleMismatch,
CandidatePrerequisiteMissing,
ServiceWorkerDirectRuntimeNotShipped,
SharedWorkerDirectRuntimeNotShipped,
MissingGlobalThis,
MissingWebAssembly,
UnsupportedRuntimeContext,
NonBrowserRuntime,
}
impl BrowserExecutionReasonCode {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Supported => "supported",
Self::CandidateHostRoleMismatch => "candidate_host_role_mismatch",
Self::CandidatePrerequisiteMissing => "candidate_prerequisite_missing",
Self::ServiceWorkerDirectRuntimeNotShipped => {
"service_worker_direct_runtime_not_shipped"
}
Self::SharedWorkerDirectRuntimeNotShipped => "shared_worker_direct_runtime_not_shipped",
Self::MissingGlobalThis => "missing_global_this",
Self::MissingWebAssembly => "missing_webassembly",
Self::UnsupportedRuntimeContext => "unsupported_runtime_context",
Self::NonBrowserRuntime => "non_browser_runtime",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BrowserExecutionLaneCandidate {
pub lane_id: BrowserExecutionLane,
pub lane_kind: BrowserExecutionLaneKind,
pub lane_rank: u16,
pub host_role: BrowserExecutionHostRole,
pub support_class: BrowserRuntimeSupportClass,
pub fallback_lane_id: Option<BrowserExecutionLane>,
pub available: bool,
pub selected: bool,
pub reason_code: BrowserExecutionReasonCode,
pub message: String,
pub guidance: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BrowserExecutionLadderDiagnostics {
pub supported: bool,
pub preferred_lane: Option<BrowserExecutionLane>,
pub selected_lane: BrowserExecutionLane,
pub lane_kind: BrowserExecutionLaneKind,
pub lane_rank: u16,
pub host_role: BrowserExecutionHostRole,
pub support_class: BrowserRuntimeSupportClass,
pub runtime_context: BrowserRuntimeContext,
pub reason_code: BrowserExecutionReasonCode,
pub message: String,
pub guidance: Vec<String>,
pub fallback_lane_id: Option<BrowserExecutionLane>,
pub downgrade_order: Vec<BrowserExecutionLane>,
pub repro_command: String,
pub candidates: Vec<BrowserExecutionLaneCandidate>,
pub runtime_support: BrowserRuntimeSupportDiagnostics,
pub capabilities: BrowserCapabilitySnapshot,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserExecutionProbe {
pub has_global_this: bool,
pub runtime_context: BrowserRuntimeContext,
pub host_role: BrowserExecutionHostRole,
pub capabilities: BrowserCapabilitySnapshot,
}
impl BrowserExecutionProbe {
#[must_use]
pub const fn non_browser() -> Self {
Self {
has_global_this: false,
runtime_context: BrowserRuntimeContext::Unknown,
host_role: BrowserExecutionHostRole::NonBrowserOrUnknown,
capabilities: BrowserCapabilitySnapshot {
execution_api: BrowserExecutionApiCapabilities {
has_abort_controller: false,
has_fetch: false,
has_webassembly: false,
},
dom: BrowserDomCapabilities {
has_document: false,
has_window: false,
},
storage: BrowserStorageCapabilities {
has_indexed_db: false,
has_local_storage: false,
},
transport: BrowserTransportCapabilities {
has_web_socket: false,
has_web_transport: false,
},
},
}
}
#[must_use]
pub const fn browser_main_thread() -> Self {
Self {
has_global_this: true,
runtime_context: BrowserRuntimeContext::BrowserMainThread,
host_role: BrowserExecutionHostRole::BrowserMainThread,
capabilities: BrowserCapabilitySnapshot {
execution_api: BrowserExecutionApiCapabilities {
has_abort_controller: true,
has_fetch: true,
has_webassembly: true,
},
dom: BrowserDomCapabilities {
has_document: true,
has_window: true,
},
storage: browser_storage_capabilities_for_host_role(
BrowserExecutionHostRole::BrowserMainThread,
),
transport: BrowserTransportCapabilities {
has_web_socket: true,
has_web_transport: false,
},
},
}
}
#[must_use]
pub const fn dedicated_worker() -> Self {
Self {
has_global_this: true,
runtime_context: BrowserRuntimeContext::DedicatedWorker,
host_role: BrowserExecutionHostRole::DedicatedWorker,
capabilities: BrowserCapabilitySnapshot {
execution_api: BrowserExecutionApiCapabilities {
has_abort_controller: true,
has_fetch: true,
has_webassembly: true,
},
dom: BrowserDomCapabilities {
has_document: false,
has_window: false,
},
storage: browser_storage_capabilities_for_host_role(
BrowserExecutionHostRole::DedicatedWorker,
),
transport: BrowserTransportCapabilities {
has_web_socket: true,
has_web_transport: false,
},
},
}
}
#[must_use]
pub const fn service_worker() -> Self {
Self {
has_global_this: true,
runtime_context: BrowserRuntimeContext::ServiceWorker,
host_role: BrowserExecutionHostRole::ServiceWorker,
capabilities: BrowserCapabilitySnapshot {
execution_api: BrowserExecutionApiCapabilities {
has_abort_controller: true,
has_fetch: true,
has_webassembly: true,
},
dom: BrowserDomCapabilities {
has_document: false,
has_window: false,
},
storage: browser_storage_capabilities_for_host_role(
BrowserExecutionHostRole::ServiceWorker,
),
transport: BrowserTransportCapabilities {
has_web_socket: true,
has_web_transport: false,
},
},
}
}
#[must_use]
pub const fn shared_worker() -> Self {
Self {
has_global_this: true,
runtime_context: BrowserRuntimeContext::SharedWorker,
host_role: BrowserExecutionHostRole::SharedWorker,
capabilities: BrowserCapabilitySnapshot {
execution_api: BrowserExecutionApiCapabilities {
has_abort_controller: true,
has_fetch: true,
has_webassembly: true,
},
dom: BrowserDomCapabilities {
has_document: false,
has_window: false,
},
storage: browser_storage_capabilities_for_host_role(
BrowserExecutionHostRole::SharedWorker,
),
transport: BrowserTransportCapabilities {
has_web_socket: true,
has_web_transport: false,
},
},
}
}
}
const fn browser_storage_capabilities_for_host_role(
host_role: BrowserExecutionHostRole,
) -> BrowserStorageCapabilities {
match host_role {
BrowserExecutionHostRole::BrowserMainThread => BrowserStorageCapabilities {
has_indexed_db: true,
has_local_storage: true,
},
BrowserExecutionHostRole::DedicatedWorker
| BrowserExecutionHostRole::ServiceWorker
| BrowserExecutionHostRole::SharedWorker => BrowserStorageCapabilities {
has_indexed_db: true,
has_local_storage: false,
},
BrowserExecutionHostRole::NonBrowserOrUnknown => BrowserStorageCapabilities {
has_indexed_db: false,
has_local_storage: false,
},
}
}
#[cfg(target_arch = "wasm32")]
fn browser_capability_snapshot(global_object: &JsValue) -> BrowserCapabilitySnapshot {
BrowserCapabilitySnapshot {
execution_api: BrowserExecutionApiCapabilities {
has_abort_controller: browser_global_has(global_object, "AbortController"),
has_fetch: browser_global_has(global_object, "fetch"),
has_webassembly: browser_global_has(global_object, "WebAssembly"),
},
dom: BrowserDomCapabilities {
has_document: browser_global_has(global_object, "document"),
has_window: browser_global_has(global_object, "window"),
},
storage: BrowserStorageCapabilities {
has_indexed_db: browser_global_has(global_object, "indexedDB"),
has_local_storage: browser_global_has(global_object, "localStorage"),
},
transport: BrowserTransportCapabilities {
has_web_socket: browser_global_has(global_object, "WebSocket"),
has_web_transport: browser_global_has(global_object, "WebTransport"),
},
}
}
#[cfg(target_arch = "wasm32")]
fn browser_global_has(global_object: &JsValue, key: &str) -> bool {
Reflect::has(global_object, &JsValue::from_str(key)).unwrap_or(false)
}
#[cfg(target_arch = "wasm32")]
fn browser_global_constructor_name(global_object: &JsValue) -> Option<String> {
let constructor = Reflect::get(global_object, &JsValue::from_str("constructor")).ok()?;
let name = Reflect::get(&constructor, &JsValue::from_str("name")).ok()?;
name.as_string()
}
#[cfg(target_arch = "wasm32")]
fn detect_browser_execution_probe() -> BrowserExecutionProbe {
let global_object = global();
let has_global_this = global_object.is_object();
let capabilities = browser_capability_snapshot(&global_object);
let constructor_name = browser_global_constructor_name(&global_object);
let host_role = match constructor_name.as_deref() {
Some("ServiceWorkerGlobalScope") => BrowserExecutionHostRole::ServiceWorker,
Some("SharedWorkerGlobalScope") => BrowserExecutionHostRole::SharedWorker,
Some("DedicatedWorkerGlobalScope") => BrowserExecutionHostRole::DedicatedWorker,
_ if capabilities.dom.has_window && capabilities.dom.has_document => {
BrowserExecutionHostRole::BrowserMainThread
}
_ => BrowserExecutionHostRole::NonBrowserOrUnknown,
};
let runtime_context = match host_role {
BrowserExecutionHostRole::BrowserMainThread => BrowserRuntimeContext::BrowserMainThread,
BrowserExecutionHostRole::DedicatedWorker => BrowserRuntimeContext::DedicatedWorker,
BrowserExecutionHostRole::ServiceWorker => BrowserRuntimeContext::ServiceWorker,
BrowserExecutionHostRole::SharedWorker => BrowserRuntimeContext::SharedWorker,
BrowserExecutionHostRole::NonBrowserOrUnknown => BrowserRuntimeContext::Unknown,
};
BrowserExecutionProbe {
has_global_this,
runtime_context,
host_role,
capabilities,
}
}
#[cfg(not(target_arch = "wasm32"))]
fn detect_browser_execution_probe() -> BrowserExecutionProbe {
BrowserExecutionProbe::non_browser()
}
fn browser_runtime_support_diagnostics(
probe: BrowserExecutionProbe,
supported: bool,
support_class: BrowserRuntimeSupportClass,
reason: BrowserRuntimeSupportReason,
message: &str,
guidance: &[&str],
) -> BrowserRuntimeSupportDiagnostics {
BrowserRuntimeSupportDiagnostics {
supported,
support_class,
runtime_context: probe.runtime_context,
reason,
message: message.to_string(),
guidance: guidance.iter().map(|entry| (*entry).to_string()).collect(),
capabilities: probe.capabilities,
}
}
fn browser_runtime_support_missing_global_this(
probe: BrowserExecutionProbe,
) -> BrowserRuntimeSupportDiagnostics {
browser_runtime_support_diagnostics(
probe,
false,
BrowserRuntimeSupportClass::Unsupported,
BrowserRuntimeSupportReason::MissingGlobalThis,
"Rust Browser Edition runtime inspection could not find a browser global object.",
&[
"Run this inspection from a browser main-thread or dedicated-worker entrypoint.",
"Use the maintained Rust browser fixture when validating browser support outside a browser host.",
],
)
}
fn browser_runtime_support_not_yet_shipped(
probe: BrowserExecutionProbe,
reason: BrowserRuntimeSupportReason,
) -> BrowserRuntimeSupportDiagnostics {
let (message, guidance) = match reason {
BrowserRuntimeSupportReason::ServiceWorkerNotYetShipped => (
"Rust Browser Edition does not yet ship a service-worker direct-runtime lane.",
&[
"Keep Rust Browser Edition runtime creation out of service-worker hosts; the direct-runtime lane is intentionally fail-closed today.",
"Use the bounded service-worker broker helpers for registration, durable handoff, and fallback orchestration instead of widening the direct-runtime claim.",
][..],
),
BrowserRuntimeSupportReason::SharedWorkerNotYetShipped => (
"Rust Browser Edition does not yet ship a shared-worker direct-runtime lane.",
&[
"Keep Rust Browser Edition runtime creation out of shared-worker hosts; the direct-runtime lane is intentionally fail-closed today.",
"Use the bounded shared-worker coordinator helpers from a browser main-thread or dedicated-worker caller instead of widening the direct-runtime claim.",
][..],
),
BrowserRuntimeSupportReason::MissingGlobalThis
| BrowserRuntimeSupportReason::UnsupportedRuntimeContext
| BrowserRuntimeSupportReason::MissingWebAssembly
| BrowserRuntimeSupportReason::Supported => {
unreachable!("only not-yet-shipped reasons are valid here")
}
};
browser_runtime_support_diagnostics(
probe,
false,
BrowserRuntimeSupportClass::Unsupported,
reason,
message,
guidance,
)
}
fn browser_runtime_support_unsupported_context(
probe: BrowserExecutionProbe,
) -> BrowserRuntimeSupportDiagnostics {
browser_runtime_support_diagnostics(
probe,
false,
BrowserRuntimeSupportClass::Unsupported,
BrowserRuntimeSupportReason::UnsupportedRuntimeContext,
"Rust Browser Edition inspection only recognizes browser main-thread and dedicated-worker direct-runtime contexts.",
&[
"Move the call into a browser main-thread or dedicated-worker entrypoint before expecting a direct runtime lane.",
],
)
}
fn browser_runtime_support_missing_webassembly(
probe: BrowserExecutionProbe,
) -> BrowserRuntimeSupportDiagnostics {
browser_runtime_support_diagnostics(
probe,
false,
BrowserRuntimeSupportClass::Unsupported,
BrowserRuntimeSupportReason::MissingWebAssembly,
"Rust Browser Edition runtime inspection found no WebAssembly support in the current host.",
&[
"Enable WebAssembly in the target browser/runtime before expecting a direct runtime lane.",
],
)
}
fn browser_runtime_support_supported(
probe: BrowserExecutionProbe,
) -> BrowserRuntimeSupportDiagnostics {
let message = match probe.runtime_context {
BrowserRuntimeContext::BrowserMainThread => {
"Rust Browser Edition runtime inspection found a browser main-thread direct-runtime context."
}
BrowserRuntimeContext::DedicatedWorker => {
"Rust Browser Edition runtime inspection found a dedicated-worker direct-runtime context."
}
BrowserRuntimeContext::ServiceWorker
| BrowserRuntimeContext::SharedWorker
| BrowserRuntimeContext::Unknown => {
unreachable!(
"supported browser runtime inspection requires a shipped direct-runtime context"
)
}
};
browser_runtime_support_diagnostics(
probe,
true,
BrowserRuntimeSupportClass::DirectRuntimeSupported,
BrowserRuntimeSupportReason::Supported,
message,
&[],
)
}
fn browser_runtime_support_from_probe(
probe: BrowserExecutionProbe,
) -> BrowserRuntimeSupportDiagnostics {
if !probe.has_global_this {
return browser_runtime_support_missing_global_this(probe);
}
match probe.host_role {
BrowserExecutionHostRole::ServiceWorker => browser_runtime_support_not_yet_shipped(
probe,
BrowserRuntimeSupportReason::ServiceWorkerNotYetShipped,
),
BrowserExecutionHostRole::SharedWorker => browser_runtime_support_not_yet_shipped(
probe,
BrowserRuntimeSupportReason::SharedWorkerNotYetShipped,
),
BrowserExecutionHostRole::BrowserMainThread
| BrowserExecutionHostRole::DedicatedWorker
| BrowserExecutionHostRole::NonBrowserOrUnknown => {
if probe.runtime_context == BrowserRuntimeContext::Unknown {
return browser_runtime_support_unsupported_context(probe);
}
if !probe.capabilities.execution_api.has_webassembly {
return browser_runtime_support_missing_webassembly(probe);
}
browser_runtime_support_supported(probe)
}
}
}
const fn browser_execution_direct_lane_for_host_role(
host_role: BrowserExecutionHostRole,
) -> Option<BrowserExecutionLane> {
match host_role {
BrowserExecutionHostRole::BrowserMainThread => {
Some(BrowserExecutionLane::BrowserMainThreadDirectRuntime)
}
BrowserExecutionHostRole::DedicatedWorker => {
Some(BrowserExecutionLane::DedicatedWorkerDirectRuntime)
}
BrowserExecutionHostRole::ServiceWorker
| BrowserExecutionHostRole::SharedWorker
| BrowserExecutionHostRole::NonBrowserOrUnknown => None,
}
}
fn browser_execution_downgrade_order(
host_role: BrowserExecutionHostRole,
) -> Vec<BrowserExecutionLane> {
browser_execution_direct_lane_for_host_role(host_role).map_or_else(
|| vec![BrowserExecutionLane::Unsupported],
|direct| vec![direct, BrowserExecutionLane::Unsupported],
)
}
fn browser_execution_reason_from_support(
support: &BrowserRuntimeSupportDiagnostics,
host_role: BrowserExecutionHostRole,
) -> BrowserExecutionReasonCode {
match support.reason {
BrowserRuntimeSupportReason::MissingGlobalThis => {
BrowserExecutionReasonCode::MissingGlobalThis
}
BrowserRuntimeSupportReason::ServiceWorkerNotYetShipped => {
BrowserExecutionReasonCode::ServiceWorkerDirectRuntimeNotShipped
}
BrowserRuntimeSupportReason::SharedWorkerNotYetShipped => {
BrowserExecutionReasonCode::SharedWorkerDirectRuntimeNotShipped
}
BrowserRuntimeSupportReason::UnsupportedRuntimeContext => {
if host_role == BrowserExecutionHostRole::NonBrowserOrUnknown {
BrowserExecutionReasonCode::NonBrowserRuntime
} else {
BrowserExecutionReasonCode::UnsupportedRuntimeContext
}
}
BrowserRuntimeSupportReason::MissingWebAssembly => {
BrowserExecutionReasonCode::MissingWebAssembly
}
BrowserRuntimeSupportReason::Supported => BrowserExecutionReasonCode::Supported,
}
}
fn browser_execution_repro_command() -> String {
"PATH=/usr/bin:$PATH bash scripts/validate_rust_browser_consumer.sh".to_string()
}
fn browser_execution_host_mismatch_message(lane_id: BrowserExecutionLane) -> String {
match lane_id {
BrowserExecutionLane::BrowserMainThreadDirectRuntime => {
"lane.browser.main_thread.direct_runtime only applies when Rust Browser Edition is running on the browser main thread."
.to_string()
}
BrowserExecutionLane::DedicatedWorkerDirectRuntime => {
"lane.browser.dedicated_worker.direct_runtime only applies when Rust Browser Edition is already executing inside a dedicated worker."
.to_string()
}
BrowserExecutionLane::Unsupported => {
"lane.unsupported is the terminal fail-closed lane and is only selected after a truthful downgrade."
.to_string()
}
}
}
fn browser_execution_host_mismatch_guidance(lane_id: BrowserExecutionLane) -> Vec<String> {
match lane_id {
BrowserExecutionLane::BrowserMainThreadDirectRuntime => vec![
"Initialize the Rust browser surface from a browser main-thread entrypoint before pinning this lane."
.to_string(),
],
BrowserExecutionLane::DedicatedWorkerDirectRuntime => vec![
"Move the Rust browser surface into a dedicated-worker entrypoint before pinning this lane."
.to_string(),
],
BrowserExecutionLane::Unsupported => vec![
"Treat lane.unsupported as the terminal fail-closed lane when no truthful direct-runtime browser lane remains."
.to_string(),
],
}
}
fn browser_execution_missing_prerequisite_message(lane_id: BrowserExecutionLane) -> String {
match lane_id {
BrowserExecutionLane::Unsupported => {
"lane.unsupported remains the terminal fail-closed fallback if the current direct-runtime lane loses truthful prerequisites."
.to_string()
}
BrowserExecutionLane::BrowserMainThreadDirectRuntime
| BrowserExecutionLane::DedicatedWorkerDirectRuntime => {
format!(
"{} matches the current host role but is unavailable until the required Browser Edition prerequisites are restored.",
match lane_id {
BrowserExecutionLane::BrowserMainThreadDirectRuntime => {
"lane.browser.main_thread.direct_runtime"
}
BrowserExecutionLane::DedicatedWorkerDirectRuntime => {
"lane.browser.dedicated_worker.direct_runtime"
}
BrowserExecutionLane::Unsupported => unreachable!(),
}
)
}
}
}
fn browser_execution_missing_prerequisite_guidance(lane_id: BrowserExecutionLane) -> Vec<String> {
match lane_id {
BrowserExecutionLane::Unsupported => vec![
"Expect Rust Browser Edition to demote here instead of pretending a direct-runtime lane exists when prerequisites disappear."
.to_string(),
],
BrowserExecutionLane::BrowserMainThreadDirectRuntime
| BrowserExecutionLane::DedicatedWorkerDirectRuntime => vec![
"Restore the missing Browser Edition prerequisites before pinning this lane again."
.to_string(),
],
}
}
fn browser_execution_preferred_lane_mismatch(
preferred_lane: BrowserExecutionLane,
selected_lane: BrowserExecutionLane,
host_role: BrowserExecutionHostRole,
direct_lane_for_host: Option<BrowserExecutionLane>,
reason_code: BrowserExecutionReasonCode,
) -> (String, Vec<String>) {
if preferred_lane != BrowserExecutionLane::Unsupported
&& Some(preferred_lane) != direct_lane_for_host
{
return (
format!(
"Preferred lane {} is not truthful for host role {}, so Rust Browser Edition stayed on {}.",
preferred_lane.as_str(),
host_role.as_str(),
selected_lane.as_str(),
),
vec![format!(
"Use {} for this host role, or switch entrypoints before pinning {}.",
selected_lane.as_str(),
preferred_lane.as_str(),
)],
);
}
if selected_lane == BrowserExecutionLane::Unsupported {
return (
format!(
"Preferred lane {} could not be selected because Rust Browser Edition currently reports {} and stayed on {}.",
preferred_lane.as_str(),
reason_code.as_str(),
selected_lane.as_str(),
),
vec![format!(
"Restore the reported Browser Edition prerequisites before pinning {} again.",
preferred_lane.as_str(),
)],
);
}
(
format!(
"Preferred lane {} is a lower-priority fail-closed fallback, so Rust Browser Edition stayed on {}.",
preferred_lane.as_str(),
selected_lane.as_str(),
),
vec![format!(
"Only pin {} when you intentionally want the fail-closed fallback lane.",
preferred_lane.as_str(),
)],
)
}
struct BrowserExecutionLaneCandidateInput {
lane_id: BrowserExecutionLane,
host_role: BrowserExecutionHostRole,
support_class: BrowserRuntimeSupportClass,
available: bool,
selected: bool,
reason_code: BrowserExecutionReasonCode,
message: String,
guidance: Vec<String>,
}
fn create_browser_execution_lane_candidate(
input: BrowserExecutionLaneCandidateInput,
) -> BrowserExecutionLaneCandidate {
BrowserExecutionLaneCandidate {
lane_id: input.lane_id,
lane_kind: input.lane_id.lane_kind(),
lane_rank: input.lane_id.lane_rank(),
host_role: input.host_role,
support_class: input.support_class,
fallback_lane_id: input.lane_id.fallback_lane(),
available: input.available,
selected: input.selected,
reason_code: input.reason_code,
message: input.message,
guidance: input.guidance,
}
}
fn browser_execution_candidates(
selected_lane: BrowserExecutionLane,
host_role: BrowserExecutionHostRole,
support_class: BrowserRuntimeSupportClass,
selected_reason_code: BrowserExecutionReasonCode,
selected_message: &str,
selected_guidance: &[String],
) -> Vec<BrowserExecutionLaneCandidate> {
let direct_lane_for_host = browser_execution_direct_lane_for_host_role(host_role);
let lane_ids = [
BrowserExecutionLane::BrowserMainThreadDirectRuntime,
BrowserExecutionLane::DedicatedWorkerDirectRuntime,
BrowserExecutionLane::Unsupported,
];
lane_ids
.into_iter()
.map(|lane_id| {
if lane_id == selected_lane {
return create_browser_execution_lane_candidate(
BrowserExecutionLaneCandidateInput {
lane_id,
host_role,
support_class,
available: true,
selected: true,
reason_code: selected_reason_code,
message: selected_message.to_string(),
guidance: selected_guidance.to_vec(),
},
);
}
let prerequisite_missing = if lane_id == BrowserExecutionLane::Unsupported {
selected_lane != BrowserExecutionLane::Unsupported
} else {
direct_lane_for_host == Some(lane_id)
&& selected_lane == BrowserExecutionLane::Unsupported
};
if prerequisite_missing {
return create_browser_execution_lane_candidate(
BrowserExecutionLaneCandidateInput {
lane_id,
host_role,
support_class,
available: false,
selected: false,
reason_code: BrowserExecutionReasonCode::CandidatePrerequisiteMissing,
message: browser_execution_missing_prerequisite_message(lane_id),
guidance: browser_execution_missing_prerequisite_guidance(lane_id),
},
);
}
create_browser_execution_lane_candidate(BrowserExecutionLaneCandidateInput {
lane_id,
host_role,
support_class,
available: false,
selected: false,
reason_code: BrowserExecutionReasonCode::CandidateHostRoleMismatch,
message: browser_execution_host_mismatch_message(lane_id),
guidance: browser_execution_host_mismatch_guidance(lane_id),
})
})
.collect()
}
fn build_browser_execution_ladder_from_probe(
preferred_lane: Option<BrowserExecutionLane>,
probe: BrowserExecutionProbe,
) -> BrowserExecutionLadderDiagnostics {
let runtime_support = browser_runtime_support_from_probe(probe);
let host_role = probe.host_role;
let direct_lane_for_host = browser_execution_direct_lane_for_host_role(host_role);
let selected_lane = runtime_support
.supported
.then_some(direct_lane_for_host)
.flatten()
.unwrap_or(BrowserExecutionLane::Unsupported);
let reason_code = browser_execution_reason_from_support(&runtime_support, host_role);
let mut message = runtime_support.message.clone();
let mut guidance = runtime_support.guidance.clone();
if let Some(preferred_lane) = preferred_lane.filter(|lane| *lane != selected_lane) {
let (mismatch_message, mismatch_guidance) = browser_execution_preferred_lane_mismatch(
preferred_lane,
selected_lane,
host_role,
direct_lane_for_host,
reason_code,
);
message = format!("{message} {mismatch_message}");
guidance.extend(mismatch_guidance);
}
let support_class = runtime_support.support_class;
let candidates = browser_execution_candidates(
selected_lane,
host_role,
support_class,
reason_code,
&message,
&guidance,
);
let capabilities = runtime_support.capabilities;
BrowserExecutionLadderDiagnostics {
supported: selected_lane != BrowserExecutionLane::Unsupported,
preferred_lane,
selected_lane,
lane_kind: selected_lane.lane_kind(),
lane_rank: selected_lane.lane_rank(),
host_role,
support_class,
runtime_context: runtime_support.runtime_context,
reason_code,
message,
guidance,
fallback_lane_id: selected_lane.fallback_lane(),
downgrade_order: browser_execution_downgrade_order(host_role),
repro_command: browser_execution_repro_command(),
candidates,
runtime_support,
capabilities,
}
}
fn browser_service_worker_broker_downgrade_order() -> Vec<BrowserWorkerFallbackTarget> {
vec![
BrowserWorkerFallbackTarget::DedicatedWorkerDirectRuntime,
BrowserWorkerFallbackTarget::BrowserMainThreadDirectRuntime,
BrowserWorkerFallbackTarget::BridgeFallback,
]
}
fn browser_shared_worker_coordinator_downgrade_order(
host_role: BrowserExecutionHostRole,
) -> Vec<BrowserWorkerFallbackTarget> {
let mut targets = Vec::new();
if host_role == BrowserExecutionHostRole::DedicatedWorker {
targets.push(BrowserWorkerFallbackTarget::DedicatedWorkerDirectRuntime);
}
if host_role == BrowserExecutionHostRole::BrowserMainThread {
targets.push(BrowserWorkerFallbackTarget::BrowserMainThreadDirectRuntime);
}
if !targets.contains(&BrowserWorkerFallbackTarget::DedicatedWorkerDirectRuntime) {
targets.push(BrowserWorkerFallbackTarget::DedicatedWorkerDirectRuntime);
}
if !targets.contains(&BrowserWorkerFallbackTarget::BrowserMainThreadDirectRuntime) {
targets.push(BrowserWorkerFallbackTarget::BrowserMainThreadDirectRuntime);
}
targets.push(BrowserWorkerFallbackTarget::BridgeFallback);
targets
}
fn browser_service_worker_broker_support_from_probe(
probe: BrowserExecutionProbe,
) -> BrowserServiceWorkerBrokerSupportDiagnostics {
let runtime_support = browser_runtime_support_from_probe(probe);
let capabilities = runtime_support.capabilities;
let downgrade_order = browser_service_worker_broker_downgrade_order();
let fallback_target = downgrade_order[0];
let direct_runtime_reason = runtime_support.reason;
let direct_execution_reason_code =
browser_execution_reason_from_support(&runtime_support, probe.host_role);
let (supported, reason, message, guidance) = if probe.host_role
!= BrowserExecutionHostRole::ServiceWorker
{
(
false,
BrowserServiceWorkerBrokerSupportReason::ServiceWorkerApiMissing,
"Rust Browser Edition service-worker broker preflight only admits service-worker hosts."
.to_string(),
vec![
"Call the bounded broker surface only from a service-worker entrypoint."
.to_string(),
format!(
"Keep direct BrowserRuntime creation on {} or {} when broker admission is unavailable.",
BrowserExecutionLane::DedicatedWorkerDirectRuntime.as_str(),
BrowserExecutionLane::BrowserMainThreadDirectRuntime.as_str()
),
],
)
} else if !probe.capabilities.storage.has_indexed_db {
(
false,
BrowserServiceWorkerBrokerSupportReason::DurableStoreUnavailableForRestartableProfile,
"Rust Browser Edition service-worker broker preflight found no durable store for the default restartable broker profile."
.to_string(),
vec![
"Restore IndexedDB-backed durability before claiming restartable broker progress."
.to_string(),
"Downgrade explicitly instead of pretending restartability without a durable substrate."
.to_string(),
],
)
} else {
(
true,
BrowserServiceWorkerBrokerSupportReason::Supported,
"Rust Browser Edition service-worker broker preflight found a bounded broker host surface; direct runtime creation remains fail-closed and work must hand off explicitly."
.to_string(),
vec![
"Keep direct BrowserRuntime creation out of the service-worker host itself."
.to_string(),
"Treat registration-scope and capability-manifest checks as explicit broker admission work on the JS helper surface."
.to_string(),
],
)
};
BrowserServiceWorkerBrokerSupportDiagnostics {
supported,
contract_id: BROWSER_SERVICE_WORKER_BROKER_CONTRACT_ID,
requested_lane: BROWSER_SERVICE_WORKER_BROKER_LANE,
fallback_target,
fallback_lane_id: fallback_target.fallback_lane_id(),
downgrade_order,
host_role: probe.host_role,
runtime_context: runtime_support.runtime_context,
reason,
message,
guidance,
direct_runtime_reason,
direct_execution_reason_code,
runtime_support,
capabilities,
}
}
fn browser_shared_worker_coordinator_support_from_probe(
probe: BrowserExecutionProbe,
) -> BrowserSharedWorkerCoordinatorSupportDiagnostics {
let runtime_support = browser_runtime_support_from_probe(probe);
let capabilities = runtime_support.capabilities;
let downgrade_order = browser_shared_worker_coordinator_downgrade_order(probe.host_role);
let fallback_target = downgrade_order[0];
let direct_runtime_reason = BrowserRuntimeSupportReason::SharedWorkerNotYetShipped;
let direct_execution_reason_code =
BrowserExecutionReasonCode::SharedWorkerDirectRuntimeNotShipped;
let (supported, reason, message, guidance) = match probe.host_role {
BrowserExecutionHostRole::BrowserMainThread | BrowserExecutionHostRole::DedicatedWorker => (
true,
BrowserSharedWorkerCoordinatorSupportReason::Supported,
"Rust Browser Edition shared-worker coordinator preflight found a truthful caller host; direct BrowserRuntime creation inside the shared-worker host remains fail-closed."
.to_string(),
vec![
"Call the bounded shared-worker coordinator only from a browser main-thread or dedicated-worker caller."
.to_string(),
"Treat same-origin script resolution, admission tuple checks, and protocol negotiation as explicit JS helper responsibilities."
.to_string(),
],
),
_ => (
false,
BrowserSharedWorkerCoordinatorSupportReason::SharedWorkerApiMissing,
"Rust Browser Edition shared-worker coordinator preflight only admits browser main-thread or dedicated-worker callers."
.to_string(),
vec![
"Move the coordinator attach flow into a browser main-thread or dedicated-worker caller before expecting a bounded coordinator surface."
.to_string(),
"Keep direct BrowserRuntime creation fail-closed inside the shared-worker host itself."
.to_string(),
],
),
};
BrowserSharedWorkerCoordinatorSupportDiagnostics {
supported,
contract_id: BROWSER_SHARED_WORKER_COORDINATOR_CONTRACT_ID,
requested_lane: BROWSER_SHARED_WORKER_COORDINATOR_LANE,
fallback_target,
fallback_lane_id: fallback_target.fallback_lane_id(),
downgrade_order,
host_role: probe.host_role,
runtime_context: runtime_support.runtime_context,
reason,
message,
guidance,
direct_runtime_reason,
direct_execution_reason_code,
runtime_support,
capabilities,
}
}
#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
pub enum BrowserRuntimeBuildError {
#[error("{message}")]
Unsupported {
execution_ladder: BrowserExecutionLadderDiagnostics,
message: String,
},
#[error("failed to create preview browser runtime handle: {source}")]
RuntimeCreate {
execution_ladder: BrowserExecutionLadderDiagnostics,
source: WasmDispatchError,
},
}
impl BrowserRuntimeBuildError {
#[must_use]
pub fn execution_ladder(&self) -> &BrowserExecutionLadderDiagnostics {
match self {
Self::Unsupported {
execution_ladder, ..
}
| Self::RuntimeCreate {
execution_ladder, ..
} => execution_ladder,
}
}
}
#[derive(Debug)]
struct BrowserRuntimeInner {
dispatcher: RefCell<WasmExportDispatcher>,
runtime_handle: WasmHandleRef,
consumer_version: Option<WasmAbiVersion>,
execution_ladder: BrowserExecutionLadderDiagnostics,
}
#[derive(Debug, Clone)]
pub struct BrowserRuntime {
inner: Rc<BrowserRuntimeInner>,
}
impl BrowserRuntime {
fn new(
dispatcher: WasmExportDispatcher,
runtime_handle: WasmHandleRef,
consumer_version: Option<WasmAbiVersion>,
execution_ladder: BrowserExecutionLadderDiagnostics,
) -> Self {
Self {
inner: Rc::new(BrowserRuntimeInner {
dispatcher: RefCell::new(dispatcher),
runtime_handle,
consumer_version,
execution_ladder,
}),
}
}
#[must_use]
pub fn runtime_handle(&self) -> WasmHandleRef {
self.inner.runtime_handle
}
#[must_use]
pub fn consumer_version(&self) -> Option<WasmAbiVersion> {
self.inner.consumer_version
}
#[must_use]
pub fn execution_ladder(&self) -> &BrowserExecutionLadderDiagnostics {
&self.inner.execution_ladder
}
#[must_use]
pub fn dispatcher_diagnostics(&self) -> WasmDispatcherDiagnostics {
self.inner.dispatcher.borrow().diagnostic_snapshot()
}
pub fn enter_scope(&self, label: Option<&str>) -> Result<WasmHandleRef, WasmDispatchError> {
let mut dispatcher = self.inner.dispatcher.borrow_mut();
dispatcher.scope_enter(
&WasmScopeEnterBuilder::new(self.runtime_handle())
.label(label.unwrap_or("root"))
.build(),
self.consumer_version(),
)
}
pub fn close_scope(
&self,
scope: &WasmHandleRef,
) -> Result<WasmAbiOutcomeEnvelope, WasmDispatchError> {
self.inner
.dispatcher
.borrow_mut()
.scope_close(scope, self.consumer_version())
}
pub fn close(&self) -> Result<WasmAbiOutcomeEnvelope, WasmDispatchError> {
self.inner
.dispatcher
.borrow_mut()
.runtime_close(&self.inner.runtime_handle, self.consumer_version())
}
}
#[derive(Debug, Clone)]
pub struct BrowserRuntimeSelectionResult {
pub execution_ladder: BrowserExecutionLadderDiagnostics,
pub runtime: Option<BrowserRuntime>,
pub error: Option<BrowserRuntimeBuildError>,
}
impl BrowserRuntimeSelectionResult {
#[must_use]
pub fn runtime_available(&self) -> bool {
self.runtime.is_some()
}
}
fn build_browser_runtime_selection_from_probe(
preferred_lane: Option<BrowserExecutionLane>,
consumer_version: Option<WasmAbiVersion>,
abort_mode: WasmAbortPropagationMode,
probe: BrowserExecutionProbe,
) -> BrowserRuntimeSelectionResult {
let execution_ladder = build_browser_execution_ladder_from_probe(preferred_lane, probe);
if !execution_ladder.supported {
return BrowserRuntimeSelectionResult {
runtime: None,
error: Some(BrowserRuntimeBuildError::Unsupported {
message: execution_ladder.message.clone(),
execution_ladder: execution_ladder.clone(),
}),
execution_ladder,
};
}
let mut dispatcher = WasmExportDispatcher::new().with_abort_mode(abort_mode);
match dispatcher.runtime_create(consumer_version) {
Ok(runtime_handle) => BrowserRuntimeSelectionResult {
runtime: Some(BrowserRuntime::new(
dispatcher,
runtime_handle,
consumer_version,
execution_ladder.clone(),
)),
error: None,
execution_ladder,
},
Err(source) => BrowserRuntimeSelectionResult {
runtime: None,
error: Some(BrowserRuntimeBuildError::RuntimeCreate {
execution_ladder: execution_ladder.clone(),
source,
}),
execution_ladder,
},
}
}
#[derive(Debug, Clone)]
pub struct BrowserRuntimeBuilder {
preferred_lane: Option<BrowserExecutionLane>,
consumer_version: Option<WasmAbiVersion>,
abort_mode: WasmAbortPropagationMode,
}
impl Default for BrowserRuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl BrowserRuntimeBuilder {
#[must_use]
pub fn new() -> Self {
Self {
preferred_lane: None,
consumer_version: None,
abort_mode: WasmAbortPropagationMode::Bidirectional,
}
}
#[must_use]
pub fn preferred_lane(mut self, lane: BrowserExecutionLane) -> Self {
self.preferred_lane = Some(lane);
self
}
#[must_use]
pub fn automatic_lane(mut self) -> Self {
self.preferred_lane = None;
self
}
#[must_use]
pub fn consumer_version(mut self, version: WasmAbiVersion) -> Self {
self.consumer_version = Some(version);
self
}
#[must_use]
pub fn abort_mode(mut self, mode: WasmAbortPropagationMode) -> Self {
self.abort_mode = mode;
self
}
#[must_use]
pub fn inspect_execution_ladder(self) -> BrowserExecutionLadderDiagnostics {
build_browser_execution_ladder_from_probe(
self.preferred_lane,
detect_browser_execution_probe(),
)
}
#[must_use]
pub fn build_selection(self) -> BrowserRuntimeSelectionResult {
build_browser_runtime_selection_from_probe(
self.preferred_lane,
self.consumer_version,
self.abort_mode,
detect_browser_execution_probe(),
)
}
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<BrowserRuntime, BrowserRuntimeBuildError> {
let selection = self.build_selection();
match (selection.runtime, selection.error) {
(Some(runtime), None) => Ok(runtime),
(None | Some(_), Some(error)) => Err(error),
(None, None) => Err(BrowserRuntimeBuildError::Unsupported {
message: selection.execution_ladder.message.clone(),
execution_ladder: selection.execution_ladder,
}),
}
}
}
#[derive(Clone)]
pub struct RuntimeBuilder {
config: RuntimeConfig,
reactor: Option<Arc<dyn Reactor>>,
io_driver: Option<IoDriverHandle>,
timer_driver: Option<TimerDriverHandle>,
entropy_source: Option<Arc<dyn EntropySource>>,
host_services: Arc<dyn RuntimeHostServices>,
}
impl RuntimeBuilder {
#[must_use]
pub fn new() -> Self {
Self {
config: RuntimeConfig::default(),
reactor: None,
io_driver: None,
timer_driver: None,
entropy_source: None,
host_services: default_runtime_host_services(),
}
}
#[must_use]
pub fn browser() -> BrowserRuntimeBuilder {
BrowserRuntimeBuilder::new()
}
#[must_use]
pub fn worker_threads(mut self, n: usize) -> Self {
self.config.worker_threads = n;
self
}
#[must_use]
pub fn obligation_leak_response(
mut self,
response: crate::runtime::config::ObligationLeakResponse,
) -> Self {
self.config.obligation_leak_response = response;
self
}
#[must_use]
pub fn thread_stack_size(mut self, size: usize) -> Self {
self.config.thread_stack_size = size;
self
}
#[must_use]
pub fn thread_name_prefix(mut self, prefix: impl Into<String>) -> Self {
self.config.thread_name_prefix = prefix.into();
self
}
#[must_use]
pub fn global_queue_limit(mut self, limit: usize) -> Self {
self.config.global_queue_limit = limit;
self
}
#[must_use]
pub fn steal_batch_size(mut self, size: usize) -> Self {
self.config.steal_batch_size = size;
self
}
#[must_use]
pub fn logical_clock_mode(mut self, mode: LogicalClockMode) -> Self {
self.config.logical_clock_mode = Some(mode);
self
}
#[must_use]
pub fn cancel_attribution_config(mut self, config: CancelAttributionConfig) -> Self {
self.config.cancel_attribution = config;
self
}
#[must_use]
pub fn blocking_threads(mut self, min: usize, max: usize) -> Self {
self.config.blocking.min_threads = min;
self.config.blocking.max_threads = max;
self
}
#[must_use]
pub fn enable_parking(mut self, enable: bool) -> Self {
self.config.enable_parking = enable;
self
}
#[must_use]
pub fn poll_budget(mut self, budget: u32) -> Self {
self.config.poll_budget = budget;
self
}
#[must_use]
pub fn browser_ready_handoff_limit(mut self, limit: usize) -> Self {
self.config.browser_ready_handoff_limit = limit;
self
}
#[must_use]
pub fn browser_worker_offload(
mut self,
config: crate::runtime::config::BrowserWorkerOffloadConfig,
) -> Self {
self.config.browser_worker_offload = config;
self
}
#[must_use]
pub fn browser_worker_offload_enabled(mut self, enabled: bool) -> Self {
self.config.browser_worker_offload.enabled = enabled;
self
}
#[must_use]
pub fn browser_worker_offload_limits(
mut self,
min_task_cost: u32,
max_in_flight: usize,
) -> Self {
self.config.browser_worker_offload.min_task_cost = min_task_cost;
self.config.browser_worker_offload.max_in_flight = max_in_flight;
self
}
#[must_use]
pub fn browser_worker_transfer_mode(
mut self,
mode: crate::runtime::config::WorkerTransferMode,
) -> Self {
self.config.browser_worker_offload.transfer_mode = mode;
self
}
#[must_use]
pub fn browser_worker_cancellation_mode(
mut self,
mode: crate::runtime::config::WorkerCancellationMode,
) -> Self {
self.config.browser_worker_offload.cancellation_mode = mode;
self
}
#[must_use]
pub fn cancel_lane_max_streak(mut self, max_streak: usize) -> Self {
self.config.cancel_lane_max_streak = max_streak;
self
}
#[must_use]
pub fn enable_governor(mut self, enable: bool) -> Self {
self.config.enable_governor = enable;
self
}
#[must_use]
pub fn governor_interval(mut self, interval: u32) -> Self {
self.config.governor_interval = interval;
self
}
#[must_use]
pub fn enable_adaptive_cancel_streak(mut self, enable: bool) -> Self {
self.config.enable_adaptive_cancel_streak = enable;
self
}
#[must_use]
pub fn adaptive_cancel_streak_epoch_steps(mut self, steps: u32) -> Self {
self.config.adaptive_cancel_streak_epoch_steps = steps;
self
}
#[must_use]
pub fn root_region_limits(mut self, limits: RegionLimits) -> Self {
self.config.root_region_limits = Some(limits);
self
}
#[must_use]
pub fn clear_root_region_limits(mut self) -> Self {
self.config.root_region_limits = None;
self
}
#[must_use]
pub fn on_thread_start<F>(mut self, f: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.config.on_thread_start = Some(Arc::new(f));
self
}
#[must_use]
pub fn on_thread_stop<F>(mut self, f: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.config.on_thread_stop = Some(Arc::new(f));
self
}
#[must_use]
pub fn metrics<M: MetricsProvider>(mut self, provider: M) -> Self {
self.config.metrics_provider = Arc::new(provider);
self
}
#[must_use]
pub fn observability(mut self, config: ObservabilityConfig) -> Self {
self.config.observability = Some(config);
self
}
#[must_use]
pub fn deadline_monitoring<F>(mut self, f: F) -> Self
where
F: FnOnce(DeadlineMonitoringBuilder) -> DeadlineMonitoringBuilder,
{
let builder = f(DeadlineMonitoringBuilder::new());
let (config, handler) = builder.finish();
let handler =
handler.or_else(|| {
if config.enabled {
Some(Arc::new(default_warning_handler)
as Arc<dyn Fn(DeadlineWarning) + Send + Sync>)
} else {
None
}
});
self.config.deadline_monitor = Some(config);
self.config.deadline_warning_handler = handler;
self
}
#[allow(clippy::result_large_err)]
pub fn with_env_overrides(mut self) -> Result<Self, Error> {
crate::runtime::env_config::apply_env_overrides(&mut self.config).map_err(|e| {
Error::new(crate::error::ErrorKind::ConfigError).with_message(e.to_string())
})?;
Ok(self)
}
#[cfg(feature = "config-file")]
#[allow(clippy::result_large_err)]
pub fn from_toml(path: impl AsRef<std::path::Path>) -> Result<Self, Error> {
let toml_config =
crate::runtime::env_config::parse_toml_file(path.as_ref()).map_err(|e| {
Error::new(crate::error::ErrorKind::ConfigError).with_message(e.to_string())
})?;
let mut config = RuntimeConfig::default();
crate::runtime::env_config::apply_toml_config(&mut config, &toml_config);
Ok(Self {
config,
reactor: None,
io_driver: None,
timer_driver: None,
entropy_source: None,
host_services: default_runtime_host_services(),
})
}
#[cfg(feature = "config-file")]
#[allow(clippy::result_large_err)]
pub fn from_toml_str(toml: &str) -> Result<Self, Error> {
let toml_config = crate::runtime::env_config::parse_toml_str(toml).map_err(|e| {
Error::new(crate::error::ErrorKind::ConfigError).with_message(e.to_string())
})?;
let mut config = RuntimeConfig::default();
crate::runtime::env_config::apply_toml_config(&mut config, &toml_config);
Ok(Self {
config,
reactor: None,
io_driver: None,
timer_driver: None,
entropy_source: None,
host_services: default_runtime_host_services(),
})
}
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<Runtime, Error> {
let Self {
config,
reactor,
io_driver,
timer_driver,
entropy_source,
host_services,
} = self;
Runtime::with_config_and_platform(
config,
reactor,
io_driver,
timer_driver,
entropy_source,
host_services.as_ref(),
)
}
#[must_use]
pub fn inspect_browser_execution_ladder(&self) -> BrowserExecutionLadderDiagnostics {
let _ = self;
build_browser_execution_ladder_from_probe(None, detect_browser_execution_probe())
}
#[must_use]
pub fn inspect_browser_execution_ladder_for_probe(
&self,
probe: BrowserExecutionProbe,
) -> BrowserExecutionLadderDiagnostics {
let _ = self;
build_browser_execution_ladder_from_probe(None, probe)
}
#[must_use]
pub fn inspect_browser_execution_ladder_with_preferred_lane(
&self,
preferred_lane: BrowserExecutionLane,
) -> BrowserExecutionLadderDiagnostics {
let _ = self;
build_browser_execution_ladder_from_probe(
Some(preferred_lane),
detect_browser_execution_probe(),
)
}
#[must_use]
pub fn inspect_browser_execution_ladder_with_preferred_lane_for_probe(
&self,
probe: BrowserExecutionProbe,
preferred_lane: BrowserExecutionLane,
) -> BrowserExecutionLadderDiagnostics {
let _ = self;
build_browser_execution_ladder_from_probe(Some(preferred_lane), probe)
}
#[must_use]
pub fn inspect_browser_service_worker_broker_support(
&self,
) -> BrowserServiceWorkerBrokerSupportDiagnostics {
let _ = self;
browser_service_worker_broker_support_from_probe(detect_browser_execution_probe())
}
#[must_use]
pub fn inspect_browser_service_worker_broker_support_for_probe(
&self,
probe: BrowserExecutionProbe,
) -> BrowserServiceWorkerBrokerSupportDiagnostics {
let _ = self;
browser_service_worker_broker_support_from_probe(probe)
}
#[must_use]
pub fn inspect_browser_shared_worker_coordinator_support(
&self,
) -> BrowserSharedWorkerCoordinatorSupportDiagnostics {
let _ = self;
browser_shared_worker_coordinator_support_from_probe(detect_browser_execution_probe())
}
#[must_use]
pub fn inspect_browser_shared_worker_coordinator_support_for_probe(
&self,
probe: BrowserExecutionProbe,
) -> BrowserSharedWorkerCoordinatorSupportDiagnostics {
let _ = self;
browser_shared_worker_coordinator_support_from_probe(probe)
}
#[must_use]
pub fn with_reactor(mut self, reactor: Arc<dyn Reactor>) -> Self {
self.reactor = Some(reactor);
self
}
#[must_use]
pub fn with_io_driver(mut self, driver: IoDriverHandle) -> Self {
self.io_driver = Some(driver);
self
}
#[must_use]
pub fn with_timer_driver(mut self, driver: TimerDriverHandle) -> Self {
self.timer_driver = Some(driver);
self
}
#[must_use]
pub fn with_entropy_source(mut self, source: Arc<dyn EntropySource>) -> Self {
self.entropy_source = Some(source);
self
}
#[must_use]
pub fn current_thread() -> Self {
Self::new().worker_threads(1)
}
#[must_use]
pub fn multi_thread() -> Self {
Self::new()
}
#[must_use]
pub fn high_throughput() -> Self {
let workers = RuntimeConfig::default_worker_threads()
.saturating_mul(2)
.max(1);
Self::new().worker_threads(workers).steal_batch_size(32)
}
#[must_use]
pub fn low_latency() -> Self {
Self::new().steal_batch_size(4).poll_budget(32)
}
}
pub struct DeadlineMonitoringBuilder {
config: MonitorConfig,
on_warning: Option<Arc<dyn Fn(DeadlineWarning) + Send + Sync>>,
}
impl DeadlineMonitoringBuilder {
fn new() -> Self {
Self {
config: MonitorConfig::default(),
on_warning: None,
}
}
#[must_use]
pub fn config(mut self, config: MonitorConfig) -> Self {
self.config = config;
self
}
#[must_use]
pub fn check_interval(mut self, interval: Duration) -> Self {
self.config.check_interval = interval;
self
}
#[must_use]
pub fn warning_threshold_fraction(mut self, fraction: f64) -> Self {
self.config.warning_threshold_fraction = fraction;
self
}
#[must_use]
pub fn checkpoint_timeout(mut self, timeout: Duration) -> Self {
self.config.checkpoint_timeout = timeout;
self
}
#[must_use]
pub fn adaptive_config(mut self, config: AdaptiveDeadlineConfig) -> Self {
self.config.adaptive = config;
self
}
#[must_use]
pub fn adaptive_enabled(mut self, enabled: bool) -> Self {
self.config.adaptive.adaptive_enabled = enabled;
self
}
#[must_use]
pub fn adaptive_warning_percentile(mut self, percentile: f64) -> Self {
self.config.adaptive.warning_percentile = percentile;
self
}
#[must_use]
pub fn adaptive_min_samples(mut self, min_samples: usize) -> Self {
self.config.adaptive.min_samples = min_samples;
self
}
#[must_use]
pub fn adaptive_max_history(mut self, max_history: usize) -> Self {
self.config.adaptive.max_history = max_history;
self
}
#[must_use]
pub fn adaptive_fallback_threshold(mut self, threshold: Duration) -> Self {
self.config.adaptive.fallback_threshold = threshold;
self
}
#[must_use]
pub fn enabled(mut self, enabled: bool) -> Self {
self.config.enabled = enabled;
self
}
#[must_use]
pub fn on_warning<F>(mut self, f: F) -> Self
where
F: Fn(DeadlineWarning) + Send + Sync + 'static,
{
self.on_warning = Some(Arc::new(f));
self
}
#[allow(clippy::type_complexity)]
fn finish(
self,
) -> (
MonitorConfig,
Option<Arc<dyn Fn(DeadlineWarning) + Send + Sync>>,
) {
(self.config, self.on_warning)
}
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct Runtime {
inner: Arc<RuntimeInner>,
}
impl Runtime {
#[allow(clippy::result_large_err)]
pub fn with_config(config: RuntimeConfig) -> Result<Self, Error> {
let host_services = default_runtime_host_services();
Self::with_config_and_platform(config, None, None, None, None, host_services.as_ref())
}
#[allow(clippy::result_large_err)]
pub fn with_config_and_reactor(
config: RuntimeConfig,
reactor: Option<Arc<dyn Reactor>>,
) -> Result<Self, Error> {
let host_services = default_runtime_host_services();
Self::with_config_and_platform(config, reactor, None, None, None, host_services.as_ref())
}
#[allow(clippy::result_large_err)]
fn with_config_and_platform(
mut config: RuntimeConfig,
reactor: Option<Arc<dyn Reactor>>,
io_driver: Option<IoDriverHandle>,
timer_driver: Option<TimerDriverHandle>,
entropy_source: Option<Arc<dyn EntropySource>>,
host_services: &dyn RuntimeHostServices,
) -> Result<Self, Error> {
config.normalize();
#[cfg(target_arch = "wasm32")]
{
let _ = (reactor, io_driver, timer_driver, entropy_source);
Err(Error::new(crate::error::ErrorKind::ConfigError)
.with_message(unsupported_browser_bootstrap_message(host_services)))
}
#[cfg(not(target_arch = "wasm32"))]
{
let (inner, workers) = RuntimeInner::new(
config,
reactor,
io_driver,
timer_driver,
entropy_source,
host_services,
);
let inner = Arc::new(inner);
let worker_threads = host_services.spawn_workers(&inner, workers).map_err(|e| {
Error::new(crate::error::ErrorKind::Internal)
.with_message(format!("runtime init: {e}"))
})?;
*lock_state(&inner.worker_threads) = worker_threads;
Ok(Self { inner })
}
}
#[must_use]
pub fn handle(&self) -> RuntimeHandle {
RuntimeHandle::strong(Arc::clone(&self.inner))
}
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let _guard = ScopedRuntimeHandle::new(self.handle());
run_future_with_budget(future, self.inner.config.poll_budget)
}
pub(crate) fn block_on_with_cx<F: Future>(
&self,
request_cx: crate::cx::Cx,
future: F,
) -> F::Output {
let _runtime_guard = ScopedRuntimeHandle::new(self.handle());
let _cx_guard = crate::cx::Cx::set_current(Some(request_cx));
run_future_with_budget(future, self.inner.config.poll_budget)
}
pub(crate) fn block_on_current_with_cx<F: Future>(
request_cx: crate::cx::Cx,
future: F,
) -> Option<F::Output> {
let handle = Self::current_handle()?;
let inner = handle.try_inner().ok()?;
let _cx_guard = crate::cx::Cx::set_current(Some(request_cx));
Some(run_future_with_budget(future, inner.config.poll_budget))
}
#[must_use]
pub(crate) fn request_cx_with_budget(&self, budget: Budget) -> crate::cx::Cx {
build_request_cx_from_inner(&self.inner, budget)
}
#[must_use]
pub(crate) fn current_request_cx_with_budget(budget: Budget) -> Option<crate::cx::Cx> {
let handle = Self::current_handle()?;
let inner = handle.try_inner().ok()?;
Some(build_request_cx_from_inner(&inner, budget))
}
#[must_use]
pub fn current_handle() -> Option<RuntimeHandle> {
CURRENT_RUNTIME_HANDLE
.try_with(|cell| cell.borrow().clone())
.unwrap_or(None)
}
#[must_use]
pub fn config(&self) -> &RuntimeConfig {
&self.inner.config
}
#[must_use]
pub fn is_quiescent(&self) -> bool {
let guard = self
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard.is_quiescent()
}
pub fn spawn_blocking<F>(
&self,
f: F,
) -> Option<crate::runtime::blocking_pool::BlockingTaskHandle>
where
F: FnOnce() + Send + 'static,
{
self.inner.blocking_pool.as_ref().map(|pool| pool.spawn(f))
}
#[must_use]
pub fn blocking_handle(&self) -> Option<crate::runtime::blocking_pool::BlockingPoolHandle> {
self.inner.blocking_handle()
}
}
#[derive(Clone)]
enum RuntimeHandleRef {
Strong(Arc<RuntimeInner>),
Weak(Weak<RuntimeInner>),
}
#[derive(Clone)]
pub struct RuntimeHandle {
inner: RuntimeHandleRef,
}
impl RuntimeHandle {
fn strong(inner: Arc<RuntimeInner>) -> Self {
Self {
inner: RuntimeHandleRef::Strong(inner),
}
}
fn weak(inner: &Arc<RuntimeInner>) -> Self {
Self {
inner: RuntimeHandleRef::Weak(Arc::downgrade(inner)),
}
}
fn try_inner(&self) -> Result<Arc<RuntimeInner>, SpawnError> {
match &self.inner {
RuntimeHandleRef::Strong(inner) => Ok(Arc::clone(inner)),
RuntimeHandleRef::Weak(inner) => inner.upgrade().ok_or(SpawnError::RuntimeUnavailable),
}
}
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.try_spawn(future)
.expect("failed to create runtime task")
}
pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, SpawnError>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.try_inner()?.spawn(future)
}
pub fn spawn_with_cx<F, Fut>(&self, f: F)
where
F: FnOnce(crate::cx::Cx) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.try_spawn_with_cx(f)
.expect("failed to spawn task with cx");
}
pub fn try_spawn_with_cx<F, Fut>(&self, f: F) -> Result<(), SpawnError>
where
F: FnOnce(crate::cx::Cx) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.try_inner()?.spawn_with_cx(f)
}
pub fn spawn_blocking<F>(
&self,
f: F,
) -> Option<crate::runtime::blocking_pool::BlockingTaskHandle>
where
F: FnOnce() + Send + 'static,
{
let inner = self.try_inner().ok()?;
inner.blocking_pool.as_ref().map(|pool| pool.spawn(f))
}
#[must_use]
pub fn blocking_handle(&self) -> Option<crate::runtime::blocking_pool::BlockingPoolHandle> {
self.try_inner().ok()?.blocking_handle()
}
}
pub struct JoinHandle<T> {
state: Arc<Mutex<JoinState<T>>>,
completed: bool,
}
impl<T> JoinHandle<T> {
fn new(state: Arc<Mutex<JoinState<T>>>) -> Self {
Self {
state,
completed: false,
}
}
#[must_use]
pub fn is_finished(&self) -> bool {
if self.completed {
return true;
}
let guard = lock_state(&self.state);
guard.result.is_some() || Arc::strong_count(&self.state) == 1
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
assert!(
!this.completed,
"runtime::JoinHandle polled after completion"
);
let mut guard = lock_state(&this.state);
match guard.result.take() {
None => {
if Arc::strong_count(&this.state) == 1 {
this.completed = true;
drop(guard);
panic!("task was dropped or cancelled before completion"); }
if !guard
.waker
.as_ref()
.is_some_and(|w| w.will_wake(cx.waker()))
{
guard.waker = Some(cx.waker().clone());
}
Poll::Pending
}
Some(Ok(output)) => {
this.completed = true;
Poll::Ready(output)
}
Some(Err(payload)) => {
this.completed = true;
drop(guard);
std::panic::resume_unwind(payload)
}
}
}
}
#[pin_project::pin_project]
struct CatchUnwind<F> {
#[pin]
inner: F,
}
impl<F: Future> Future for CatchUnwind<F> {
type Output = std::thread::Result<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
this.inner.as_mut().poll(cx)
}));
match result {
Ok(Poll::Pending) => Poll::Pending,
Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
Err(payload) => Poll::Ready(Err(payload)),
}
}
}
struct RuntimeInner {
config: RuntimeConfig,
state: Arc<crate::sync::ContendedMutex<RuntimeState>>,
scheduler: ThreeLaneScheduler,
worker_threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
root_region: crate::types::RegionId,
blocking_pool: Option<crate::runtime::blocking_pool::BlockingPool>,
deadline_monitor_shutdown: Option<Arc<std::sync::atomic::AtomicBool>>,
deadline_monitor_thread: Option<std::thread::JoinHandle<()>>,
}
impl RuntimeInner {
fn initialize_runtime_state(
config: &RuntimeConfig,
reactor: Option<Arc<dyn Reactor>>,
io_driver: Option<IoDriverHandle>,
timer_driver: Option<TimerDriverHandle>,
entropy_source: Option<Arc<dyn EntropySource>>,
) -> RuntimeState {
let mut runtime_state = reactor.map_or_else(
|| RuntimeState::new_with_metrics(config.metrics_provider.clone()),
|reactor| {
RuntimeState::with_reactor_and_metrics(reactor, config.metrics_provider.clone())
},
);
if let Some(driver) = io_driver {
runtime_state.set_io_driver(driver);
}
if let Some(driver) = timer_driver {
runtime_state.set_timer_driver(driver);
}
if let Some(source) = entropy_source {
runtime_state.set_entropy_source(source);
}
runtime_state
}
fn initialize_root_region(
config: &RuntimeConfig,
state: &Arc<crate::sync::ContendedMutex<RuntimeState>>,
) -> crate::types::RegionId {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(observability) = config.observability.clone() {
guard.set_observability_config(observability);
}
if let Some(mode) = config.logical_clock_mode.clone() {
guard.set_logical_clock_mode(mode);
}
guard.set_cancel_attribution_config(config.cancel_attribution);
guard.set_obligation_leak_response(config.obligation_leak_response);
guard.set_leak_escalation(config.leak_escalation);
if guard.timer_driver().is_none() {
guard.set_timer_driver(TimerDriverHandle::with_wall_clock());
}
let root = guard.create_root_region(Budget::INFINITE);
if let Some(limits) = config.root_region_limits.clone() {
let _ = guard.set_region_limits(root, limits);
}
root
}
fn new(
config: RuntimeConfig,
reactor: Option<Arc<dyn Reactor>>,
io_driver: Option<IoDriverHandle>,
timer_driver: Option<TimerDriverHandle>,
entropy_source: Option<Arc<dyn EntropySource>>,
host_services: &dyn RuntimeHostServices,
) -> (Self, Vec<ThreeLaneWorker>) {
let runtime_state = Self::initialize_runtime_state(
&config,
reactor,
io_driver,
timer_driver,
entropy_source,
);
let state = Arc::new(crate::sync::ContendedMutex::new(
"runtime_state",
runtime_state,
));
let root_region = Self::initialize_root_region(&config, &state);
let mut scheduler = ThreeLaneScheduler::new_with_options(
config.worker_threads,
&state,
config.cancel_lane_max_streak,
config.enable_governor,
config.governor_interval,
);
scheduler.set_steal_batch_size(config.steal_batch_size);
scheduler.set_enable_parking(config.enable_parking);
scheduler.set_global_queue_limit(config.global_queue_limit);
scheduler.set_browser_ready_handoff_limit(config.browser_ready_handoff_limit);
scheduler.set_adaptive_cancel_streak(
config.enable_adaptive_cancel_streak,
config.adaptive_cancel_streak_epoch_steps,
);
let workers = scheduler.take_workers();
let deadline_monitor = host_services.start_deadline_monitor(&config, &state);
let blocking_pool = Self::create_blocking_pool(&config);
if let Some(pool) = blocking_pool.as_ref() {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard.set_blocking_pool(pool.handle());
}
(
Self {
config,
state,
scheduler,
worker_threads: Mutex::new(Vec::new()),
root_region,
blocking_pool,
deadline_monitor_shutdown: deadline_monitor.shutdown,
deadline_monitor_thread: deadline_monitor.thread,
},
workers,
)
}
fn create_blocking_pool(
config: &RuntimeConfig,
) -> Option<crate::runtime::blocking_pool::BlockingPool> {
if config.blocking.max_threads == 0 {
return None;
}
let options = crate::runtime::blocking_pool::BlockingPoolOptions {
idle_timeout: Duration::from_secs(10),
thread_name_prefix: format!("{}-blocking", config.thread_name_prefix),
on_thread_start: config.on_thread_start.clone(),
on_thread_stop: config.on_thread_stop.clone(),
..Default::default()
};
Some(crate::runtime::blocking_pool::BlockingPool::with_config(
config.blocking.min_threads,
config.blocking.max_threads,
options,
))
}
fn spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, SpawnError>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let join_state = Arc::new(Mutex::new(JoinState::new()));
let join_state_for_task = Arc::clone(&join_state);
let wrapped = async move {
let result = CatchUnwind { inner: future }.await;
complete_task(&join_state_for_task, result);
};
let task_id = {
let mut guard = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard.create_task(self.root_region, Budget::new(), wrapped)?;
task_id
};
self.scheduler.inject_ready(task_id, Budget::new().priority);
Ok(JoinHandle::new(join_state))
}
fn spawn_with_cx<F, Fut>(&self, f: F) -> Result<(), SpawnError>
where
F: FnOnce(crate::cx::Cx) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
use crate::runtime::StoredTask;
use crate::types::Outcome;
let task_id = {
let mut guard = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle, cx, _result_tx) =
guard.create_task_infrastructure::<()>(self.root_region, Budget::new(), false)?;
let future = f(cx);
let wrapped = async move {
future.await;
Outcome::Ok(())
};
guard.store_spawned_task(task_id, StoredTask::new_with_id(wrapped, task_id));
drop(guard);
task_id
};
self.scheduler.inject_ready(task_id, Budget::new().priority);
Ok(())
}
fn blocking_handle(&self) -> Option<crate::runtime::blocking_pool::BlockingPoolHandle> {
self.blocking_pool
.as_ref()
.map(crate::runtime::blocking_pool::BlockingPool::handle)
}
}
impl Drop for RuntimeInner {
fn drop(&mut self) {
if let Some(shutdown) = self.deadline_monitor_shutdown.take() {
shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
}
if let Some(thread) = self.deadline_monitor_thread.take() {
let _ = thread.join();
}
self.scheduler.shutdown();
if let Some(pool) = self.blocking_pool.take() {
pool.shutdown();
}
let mut handles = lock_state(&self.worker_threads);
for handle in handles.drain(..) {
let _ = handle.join();
}
}
}
struct JoinState<T> {
result: Option<std::thread::Result<T>>,
waker: Option<Waker>,
}
impl<T> JoinState<T> {
fn new() -> Self {
Self {
result: None,
waker: None,
}
}
}
fn lock_state<T>(state: &Mutex<T>) -> MutexGuard<'_, T> {
state.lock()
}
fn complete_task<T>(state: &Arc<Mutex<JoinState<T>>>, output: std::thread::Result<T>) {
let waker = {
let mut guard = lock_state(state);
guard.result = Some(output);
guard.waker.take()
};
if let Some(waker) = waker {
waker.wake();
}
}
struct ThreadWaker {
thread: std::thread::Thread,
woken: std::sync::atomic::AtomicBool,
}
use std::task::Wake;
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
self.woken.store(true, std::sync::atomic::Ordering::Release);
self.thread.unpark();
}
fn wake_by_ref(self: &Arc<Self>) {
self.woken.store(true, std::sync::atomic::Ordering::Release);
self.thread.unpark();
}
}
fn run_future_with_budget<F: Future>(future: F, poll_budget: u32) -> F::Output {
let thread = std::thread::current();
let thread_waker = Arc::new(ThreadWaker {
thread,
woken: std::sync::atomic::AtomicBool::new(false),
});
let waker = Waker::from(Arc::clone(&thread_waker));
let mut cx = Context::from_waker(&waker);
let mut future = std::pin::pin!(future);
let mut polls = 0u32;
let budget = poll_budget.max(1);
let mut consecutive_budget_exhaustions: u32 = 0;
loop {
thread_waker
.woken
.store(false, std::sync::atomic::Ordering::Relaxed);
match future.as_mut().poll(&mut cx) {
Poll::Ready(output) => return output,
Poll::Pending => {
if thread_waker
.woken
.load(std::sync::atomic::Ordering::Acquire)
{
polls = polls.saturating_add(1);
if polls >= budget {
consecutive_budget_exhaustions =
consecutive_budget_exhaustions.saturating_add(1);
let backoff_ms = match consecutive_budget_exhaustions {
1 => 1,
2 => 5,
_ => 25,
};
std::thread::sleep(Duration::from_millis(backoff_ms));
polls = 0;
}
} else {
polls = 0;
consecutive_budget_exhaustions = 0;
std::thread::park();
}
}
}
}
}
fn build_request_cx_from_inner(inner: &Arc<RuntimeInner>, budget: Budget) -> crate::cx::Cx {
let task = crate::types::TaskId::new_ephemeral();
let (observability, io_driver, timer_driver, blocking_pool, logical_clock, entropy, trace) = {
let guard = inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let timer_driver = guard.timer_driver_handle();
let logical_clock = guard
.logical_clock_mode()
.build_handle(timer_driver.clone());
(
guard.observability_for_task(inner.root_region, task),
guard.io_driver_handle(),
timer_driver,
guard.blocking_pool_handle(),
logical_clock,
guard.entropy_source().fork(task),
guard.trace_handle(),
)
};
let request_cx = crate::cx::Cx::new_with_drivers(
inner.root_region,
task,
budget,
observability,
io_driver,
None,
timer_driver,
Some(entropy),
)
.with_blocking_pool_handle(blocking_pool)
.with_logical_clock(logical_clock);
request_cx.set_trace_buffer(trace);
request_cx
}
#[cfg(test)]
#[allow(unsafe_code)]
mod tests {
use super::*;
use crate::cx::Cx;
use crate::lab::{LabConfig, LabRuntime};
use crate::record::TaskRecord;
use crate::runtime::reactor::{Event, Interest, LabReactor, Reactor};
use crate::test_utils::init_test_logging;
use crate::time::sleep;
use crate::trace::{TraceEvent, TraceEventKind};
use crate::types::{Budget, CancelReason, CxInner, Time};
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use std::time::Duration;
static CURRENT_HANDLE_DTOR_STATE: AtomicU8 = AtomicU8::new(0);
thread_local! {
static CURRENT_HANDLE_DTOR_PROBE: CurrentHandleDtorProbe = const { CurrentHandleDtorProbe };
}
struct CurrentHandleDtorProbe;
impl Drop for CurrentHandleDtorProbe {
fn drop(&mut self) {
let state = match CURRENT_RUNTIME_HANDLE.try_with(|cell| cell.borrow().clone()) {
Ok(Some(_)) => 1,
Ok(None) => 2,
Err(_) => {
if Runtime::current_handle().is_none() {
3
} else {
4
}
}
};
CURRENT_HANDLE_DTOR_STATE.store(state, Ordering::SeqCst);
}
}
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
match payload.downcast::<String>() {
Ok(message) => *message,
Err(payload) => payload.downcast::<&'static str>().map_or_else(
|_| "<non-string panic payload>".to_string(),
|message| (*message).to_string(),
),
}
}
#[derive(Default)]
struct RecordingNativeHostServices {
worker_bootstrap_calls: AtomicUsize,
deadline_monitor_calls: AtomicUsize,
}
impl RuntimeHostServices for RecordingNativeHostServices {
fn kind(&self) -> RuntimeHostServicesKind {
RuntimeHostServicesKind::NativeStdThread
}
fn spawn_workers(
&self,
runtime: &Arc<RuntimeInner>,
workers: Vec<ThreeLaneWorker>,
) -> io::Result<Vec<std::thread::JoinHandle<()>>> {
self.worker_bootstrap_calls.fetch_add(1, Ordering::SeqCst);
NativeThreadHostServices::spawn_worker_threads(runtime, workers)
}
fn start_deadline_monitor(
&self,
config: &RuntimeConfig,
state: &Arc<crate::sync::ContendedMutex<RuntimeState>>,
) -> DeadlineMonitorHostService {
self.deadline_monitor_calls.fetch_add(1, Ordering::SeqCst);
NativeThreadHostServices::start_deadline_monitor(config, state)
}
}
#[test]
fn browser_host_services_contract_pins_threadless_startup_requirements() {
let contract = BrowserHostServicesContract::V1;
assert!(
contract
.required_capabilities
.contains(&"host-turn wakeups"),
"browser contract must require host-turn wakeups"
);
assert!(
contract
.required_capabilities
.contains(&"worker bootstrap hooks"),
"browser contract must require worker bootstrap hooks"
);
assert!(
contract
.required_capabilities
.contains(&"timer/deadline driving"),
"browser contract must require timer/deadline driving"
);
assert!(
contract
.required_capabilities
.contains(&"lane-health callbacks"),
"browser contract must require lane-health callbacks"
);
assert!(
contract
.diagnostic_requirements()
.contains("threadless startup"),
"diagnostics should explain why the browser path is threadless"
);
}
#[test]
fn browser_bootstrap_error_describes_host_services_requirements() {
let message = unsupported_browser_bootstrap_message(&NativeThreadHostServices::new());
assert!(
message.contains("RuntimeHostServices seam"),
"diagnostic should name the startup seam: {message}"
);
assert!(
message.contains("native-std-thread"),
"diagnostic should name the shipped native host implementation: {message}"
);
assert!(
message.contains("host-turn wakeups") && message.contains("lane-health callbacks"),
"diagnostic should enumerate the missing browser host requirements: {message}"
);
assert!(
message.contains("threadless startup"),
"diagnostic should explain the threadless browser target: {message}"
);
}
fn browser_probe(
host_role: BrowserExecutionHostRole,
runtime_context: BrowserRuntimeContext,
has_window: bool,
has_document: bool,
has_webassembly: bool,
) -> BrowserExecutionProbe {
BrowserExecutionProbe {
has_global_this: true,
runtime_context,
host_role,
capabilities: BrowserCapabilitySnapshot {
execution_api: BrowserExecutionApiCapabilities {
has_abort_controller: true,
has_fetch: true,
has_webassembly,
},
dom: BrowserDomCapabilities {
has_document,
has_window,
},
storage: browser_storage_capabilities_for_host_role(host_role),
transport: BrowserTransportCapabilities {
has_web_socket: true,
has_web_transport: false,
},
},
}
}
#[test]
fn browser_execution_ladder_selects_main_thread_lane_for_supported_probe() {
let diagnostics = build_browser_execution_ladder_from_probe(
None,
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
true,
),
);
assert!(
diagnostics.supported,
"main-thread probe should be supported"
);
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::BrowserMainThreadDirectRuntime,
"main-thread probe should select the main-thread direct-runtime lane"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::Supported,
"supported probe should keep the supported reason code"
);
assert_eq!(
diagnostics.host_role,
BrowserExecutionHostRole::BrowserMainThread,
"host role should stay on the browser main thread"
);
assert_eq!(
diagnostics.runtime_context,
BrowserRuntimeContext::BrowserMainThread,
"runtime context should stay on the browser main thread"
);
let selected_candidate = diagnostics
.candidates
.iter()
.find(|candidate| candidate.selected)
.expect("one selected candidate");
assert_eq!(
selected_candidate.lane_id,
BrowserExecutionLane::BrowserMainThreadDirectRuntime,
"selected candidate should match the selected lane"
);
assert!(
diagnostics.capabilities.storage.has_indexed_db,
"main-thread probe should surface IndexedDB availability in ladder diagnostics"
);
assert!(
diagnostics.capabilities.storage.has_local_storage,
"main-thread probe should surface localStorage availability in ladder diagnostics"
);
}
#[test]
fn browser_execution_ladder_preserves_truthful_lane_when_preferred_lane_mismatches() {
let diagnostics = build_browser_execution_ladder_from_probe(
Some(BrowserExecutionLane::DedicatedWorkerDirectRuntime),
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
true,
),
);
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::BrowserMainThreadDirectRuntime,
"preferred-lane mismatch must not override the truthful selected lane"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::Supported,
"preferred-lane mismatch should not rewrite the truthful selected reason"
);
assert!(
diagnostics
.message
.contains("lane.browser.dedicated_worker.direct_runtime"),
"message should name the preferred lane mismatch"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("switch entrypoints")),
"guidance should explain how to satisfy the preferred lane"
);
}
#[test]
fn browser_execution_ladder_keeps_prerequisite_reason_when_preferred_lane_fails_closed() {
let diagnostics = build_browser_execution_ladder_from_probe(
Some(BrowserExecutionLane::BrowserMainThreadDirectRuntime),
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
false,
),
);
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::Unsupported,
"missing prerequisites should still fail closed to lane.unsupported"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::MissingWebAssembly,
"preferred-lane mismatch must preserve the real missing-prerequisite reason"
);
assert!(
diagnostics.message.contains("missing_webassembly"),
"message should preserve the real missing-prerequisite reason code"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("Restore the reported Browser Edition prerequisites")),
"guidance should explain how to restore the missing prerequisite"
);
}
#[test]
fn browser_execution_ladder_distinguishes_intentional_fail_closed_preference() {
let diagnostics = build_browser_execution_ladder_from_probe(
Some(BrowserExecutionLane::Unsupported),
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
true,
),
);
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::BrowserMainThreadDirectRuntime,
"preferred fallback pin must not override the truthful direct-runtime lane"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::Supported,
"preferred fallback pin should not rewrite the selected reason"
);
assert!(
diagnostics
.message
.contains("lower-priority fail-closed fallback"),
"message should describe the explicit fallback pin instead of a host-role mismatch"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("Only pin")),
"guidance should explain that lane.unsupported is an intentional fail-closed pin"
);
}
#[test]
fn browser_execution_ladder_fail_closes_non_browser_probe() {
let diagnostics =
build_browser_execution_ladder_from_probe(None, BrowserExecutionProbe::non_browser());
assert!(!diagnostics.supported, "non-browser probe must fail closed");
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::Unsupported,
"non-browser probe must demote to the terminal unsupported lane"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::MissingGlobalThis,
"non-browser probe should surface the missing-global diagnostic"
);
}
#[test]
fn browser_execution_ladder_fail_closes_service_worker_probe_with_not_shipped_reason() {
let diagnostics = RuntimeBuilder::new()
.inspect_browser_execution_ladder_for_probe(BrowserExecutionProbe::service_worker());
assert!(!diagnostics.supported, "service worker must fail close");
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::Unsupported,
"service worker must remain on the fail-closed lane"
);
assert_eq!(
diagnostics.host_role,
BrowserExecutionHostRole::ServiceWorker,
"service worker probe must preserve host role"
);
assert_eq!(
diagnostics.runtime_context,
BrowserRuntimeContext::ServiceWorker,
"service worker probe must preserve the explicit service-worker runtime context"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::ServiceWorkerDirectRuntimeNotShipped,
"service worker probe must preserve the explicit not-shipped reason"
);
assert_eq!(
diagnostics.runtime_support.reason,
BrowserRuntimeSupportReason::ServiceWorkerNotYetShipped,
"runtime-support reason must stay aligned with the execution-ladder reason"
);
assert_eq!(
diagnostics.runtime_support.runtime_context,
BrowserRuntimeContext::ServiceWorker,
"runtime-support diagnostics must preserve the explicit service-worker runtime context"
);
assert!(
diagnostics.capabilities.storage.has_indexed_db,
"service-worker probe should surface truthful IndexedDB support even while direct runtime stays fail-closed"
);
assert!(
!diagnostics.capabilities.storage.has_local_storage,
"service-worker probe should keep localStorage unavailable"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("service-worker broker")),
"service-worker guidance should point callers at the bounded broker helpers"
);
}
#[test]
fn browser_service_worker_broker_support_is_explicit_for_service_worker_probe() {
let diagnostics = RuntimeBuilder::new()
.inspect_browser_service_worker_broker_support_for_probe(
BrowserExecutionProbe::service_worker(),
);
assert!(
diagnostics.supported,
"service-worker broker support must be explicit for a truthful service-worker probe"
);
assert_eq!(
diagnostics.contract_id, BROWSER_SERVICE_WORKER_BROKER_CONTRACT_ID,
"service-worker broker contract id must stay stable"
);
assert_eq!(
diagnostics.requested_lane, BROWSER_SERVICE_WORKER_BROKER_LANE,
"service-worker broker lane id must stay stable"
);
assert_eq!(
diagnostics.fallback_target,
BrowserWorkerFallbackTarget::DedicatedWorkerDirectRuntime,
"service-worker broker should prefer the dedicated-worker fallback first"
);
assert_eq!(
diagnostics.fallback_lane_id,
Some(BrowserExecutionLane::DedicatedWorkerDirectRuntime),
"service-worker broker should map its first fallback to the dedicated-worker lane"
);
assert_eq!(
diagnostics.reason,
BrowserServiceWorkerBrokerSupportReason::Supported,
"service-worker broker reason must remain supported on the truthful host"
);
assert_eq!(
diagnostics.direct_runtime_reason,
BrowserRuntimeSupportReason::ServiceWorkerNotYetShipped,
"service-worker broker must preserve the fail-closed direct-runtime truth"
);
assert_eq!(
diagnostics.direct_execution_reason_code,
BrowserExecutionReasonCode::ServiceWorkerDirectRuntimeNotShipped,
"service-worker broker must preserve the fail-closed direct execution reason"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("registration-scope")),
"service-worker broker guidance should preserve explicit broker-admission boundaries"
);
}
#[test]
fn browser_execution_ladder_fail_closes_shared_worker_probe_with_not_shipped_reason() {
let diagnostics = RuntimeBuilder::new()
.inspect_browser_execution_ladder_for_probe(BrowserExecutionProbe::shared_worker());
assert!(!diagnostics.supported, "shared worker must fail close");
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::Unsupported,
"shared worker must remain on the fail-closed lane"
);
assert_eq!(
diagnostics.host_role,
BrowserExecutionHostRole::SharedWorker,
"shared worker probe must preserve host role"
);
assert_eq!(
diagnostics.runtime_context,
BrowserRuntimeContext::SharedWorker,
"shared worker probe must preserve the explicit shared-worker runtime context"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::SharedWorkerDirectRuntimeNotShipped,
"shared worker probe must preserve the explicit not-shipped reason"
);
assert_eq!(
diagnostics.runtime_support.reason,
BrowserRuntimeSupportReason::SharedWorkerNotYetShipped,
"runtime-support reason must stay aligned with the execution-ladder reason"
);
assert_eq!(
diagnostics.runtime_support.runtime_context,
BrowserRuntimeContext::SharedWorker,
"runtime-support diagnostics must preserve the explicit shared-worker runtime context"
);
assert!(
diagnostics.capabilities.storage.has_indexed_db,
"shared-worker probe should surface truthful IndexedDB support even while direct runtime stays fail-closed"
);
assert!(
!diagnostics.capabilities.storage.has_local_storage,
"shared-worker probe should keep localStorage unavailable"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("shared-worker coordinator")),
"shared-worker guidance should point callers at the bounded coordinator helpers"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("browser main-thread or dedicated-worker")),
"shared-worker guidance should preserve the truthful caller boundary"
);
}
#[test]
fn browser_shared_worker_coordinator_support_is_explicit_for_supported_callers() {
let main_thread = RuntimeBuilder::new()
.inspect_browser_shared_worker_coordinator_support_for_probe(
BrowserExecutionProbe::browser_main_thread(),
);
let dedicated_worker = RuntimeBuilder::new()
.inspect_browser_shared_worker_coordinator_support_for_probe(
BrowserExecutionProbe::dedicated_worker(),
);
assert!(
main_thread.supported,
"browser main thread must remain a truthful shared-worker coordinator caller"
);
assert_eq!(
main_thread.contract_id, BROWSER_SHARED_WORKER_COORDINATOR_CONTRACT_ID,
"shared-worker coordinator contract id must stay stable"
);
assert_eq!(
main_thread.requested_lane, BROWSER_SHARED_WORKER_COORDINATOR_LANE,
"shared-worker coordinator lane id must stay stable"
);
assert_eq!(
main_thread.fallback_target,
BrowserWorkerFallbackTarget::BrowserMainThreadDirectRuntime,
"browser main-thread callers should preserve their current lane as the first truthful fallback"
);
assert_eq!(
main_thread.fallback_lane_id,
Some(BrowserExecutionLane::BrowserMainThreadDirectRuntime),
"browser main-thread callers should map the first fallback to the current direct-runtime lane"
);
assert_eq!(
main_thread.reason,
BrowserSharedWorkerCoordinatorSupportReason::Supported,
"browser main-thread callers must preserve the bounded coordinator support reason"
);
assert_eq!(
main_thread.direct_runtime_reason,
BrowserRuntimeSupportReason::SharedWorkerNotYetShipped,
"caller-side coordinator diagnostics must preserve the shared-worker fail-closed truth"
);
assert_eq!(
main_thread.direct_execution_reason_code,
BrowserExecutionReasonCode::SharedWorkerDirectRuntimeNotShipped,
"caller-side coordinator diagnostics must preserve the shared-worker execution reason"
);
assert!(
main_thread
.guidance
.iter()
.any(|entry| entry.contains("same-origin")),
"shared-worker coordinator guidance should preserve explicit JS attach prerequisites"
);
assert!(
dedicated_worker.supported,
"dedicated-worker callers must remain truthful shared-worker coordinator callers"
);
assert_eq!(
dedicated_worker.fallback_target,
BrowserWorkerFallbackTarget::DedicatedWorkerDirectRuntime,
"dedicated-worker callers should preserve their current lane as the first truthful fallback"
);
assert_eq!(
dedicated_worker.fallback_lane_id,
Some(BrowserExecutionLane::DedicatedWorkerDirectRuntime),
"dedicated-worker callers should map the first fallback to the current direct-runtime lane"
);
}
#[test]
fn browser_shared_worker_coordinator_support_rejects_shared_worker_host() {
let diagnostics = RuntimeBuilder::new()
.inspect_browser_shared_worker_coordinator_support_for_probe(
BrowserExecutionProbe::shared_worker(),
);
assert!(
!diagnostics.supported,
"shared-worker coordinator preflight must reject the shared-worker host itself"
);
assert_eq!(
diagnostics.reason,
BrowserSharedWorkerCoordinatorSupportReason::SharedWorkerApiMissing,
"shared-worker coordinator preflight must preserve the unsupported-caller reason"
);
assert!(
diagnostics
.guidance
.iter()
.any(|entry| entry.contains("browser main-thread or dedicated-worker")),
"shared-worker coordinator rejection guidance must preserve the truthful caller boundary"
);
}
#[test]
fn browser_execution_ladder_preserves_truthful_worker_storage_snapshots() {
let dedicated = RuntimeBuilder::new()
.inspect_browser_execution_ladder_for_probe(BrowserExecutionProbe::dedicated_worker());
assert!(
dedicated.capabilities.storage.has_indexed_db,
"dedicated-worker probe should surface IndexedDB availability"
);
assert!(
!dedicated.capabilities.storage.has_local_storage,
"dedicated-worker probe should keep localStorage unavailable"
);
}
#[test]
fn browser_execution_ladder_keeps_missing_webassembly_visible_in_candidates() {
let diagnostics = build_browser_execution_ladder_from_probe(
None,
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
false,
),
);
assert_eq!(
diagnostics.selected_lane,
BrowserExecutionLane::Unsupported,
"missing WebAssembly must fail closed to the unsupported lane"
);
assert_eq!(
diagnostics.reason_code,
BrowserExecutionReasonCode::MissingWebAssembly,
"selected reason should preserve the real missing-prerequisite failure"
);
let direct_candidate = diagnostics
.candidates
.iter()
.find(|candidate| {
candidate.lane_id == BrowserExecutionLane::BrowserMainThreadDirectRuntime
})
.expect("main-thread candidate");
assert_eq!(
direct_candidate.reason_code,
BrowserExecutionReasonCode::CandidatePrerequisiteMissing,
"direct lane candidate should remain a prerequisite-missing rejection"
);
}
#[test]
fn browser_runtime_builder_selection_constructs_runtime_for_supported_probe() {
let selection = build_browser_runtime_selection_from_probe(
None,
None,
WasmAbortPropagationMode::Bidirectional,
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
true,
),
);
assert!(
selection.runtime_available(),
"supported probe should construct a preview browser runtime"
);
assert_eq!(
selection.execution_ladder.selected_lane,
BrowserExecutionLane::BrowserMainThreadDirectRuntime,
"supported probe should stay on the truthful main-thread lane"
);
let runtime = selection.runtime.expect("supported runtime");
let scope = runtime
.enter_scope(Some("browser-runtime-selection-smoke"))
.expect("scope should open");
let scope_close = runtime
.close_scope(&scope)
.expect("scope close should succeed");
assert!(
matches!(scope_close, WasmAbiOutcomeEnvelope::Ok { .. }),
"scope close should return an ok outcome"
);
let runtime_close = runtime.close().expect("runtime close should succeed");
assert!(
matches!(runtime_close, WasmAbiOutcomeEnvelope::Ok { .. }),
"runtime close should return an ok outcome"
);
assert!(
runtime.dispatcher_diagnostics().is_clean(),
"dispatcher should be clean after full runtime teardown"
);
}
#[test]
fn browser_runtime_builder_selection_preserves_truthful_lane_under_mismatch() {
let selection = build_browser_runtime_selection_from_probe(
Some(BrowserExecutionLane::DedicatedWorkerDirectRuntime),
None,
WasmAbortPropagationMode::Bidirectional,
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
true,
),
);
assert!(
selection.runtime_available(),
"preferred-lane mismatch should still construct a runtime when a truthful lane exists"
);
assert_eq!(
selection.execution_ladder.selected_lane,
BrowserExecutionLane::BrowserMainThreadDirectRuntime,
"preferred-lane mismatch must preserve the truthful selected lane"
);
assert_eq!(
selection.execution_ladder.preferred_lane,
Some(BrowserExecutionLane::DedicatedWorkerDirectRuntime),
"selection should retain the requested preferred lane for diagnostics"
);
}
#[test]
fn browser_runtime_builder_selection_fail_closes_when_webassembly_missing() {
let selection = build_browser_runtime_selection_from_probe(
None,
None,
WasmAbortPropagationMode::Bidirectional,
browser_probe(
BrowserExecutionHostRole::BrowserMainThread,
BrowserRuntimeContext::BrowserMainThread,
true,
true,
false,
),
);
assert!(
!selection.runtime_available(),
"missing WebAssembly must fail close instead of constructing a runtime"
);
let error = selection.error.expect("structured unsupported error");
assert!(matches!(
error,
BrowserRuntimeBuildError::Unsupported { .. }
));
assert_eq!(
error.execution_ladder().reason_code,
BrowserExecutionReasonCode::MissingWebAssembly,
"structured unsupported error must preserve the real missing-prerequisite reason"
);
}
#[test]
fn browser_runtime_builder_build_returns_structured_unsupported_error() {
let error = BrowserRuntimeBuilder::new().build().expect_err(
"native test host should fail-close instead of constructing a browser runtime",
);
assert!(matches!(
error,
BrowserRuntimeBuildError::Unsupported { .. }
));
assert_eq!(
error.execution_ladder().selected_lane,
BrowserExecutionLane::Unsupported,
"native host should fail-close to lane.unsupported"
);
}
#[test]
fn runtime_builder_routes_native_startup_through_host_services_seam() {
init_test_logging();
let host_services = Arc::new(RecordingNativeHostServices::default());
let seam: Arc<dyn RuntimeHostServices> = host_services.clone();
let mut builder = RuntimeBuilder::current_thread();
builder.host_services = seam;
let runtime = builder.build().expect("runtime build");
let result = runtime.block_on(runtime.handle().spawn(async { 7u32 }));
assert_eq!(result, 7, "runtime should remain usable through the seam");
assert_eq!(
host_services.worker_bootstrap_calls.load(Ordering::SeqCst),
1,
"worker startup should route through the host-services seam"
);
assert_eq!(
host_services.deadline_monitor_calls.load(Ordering::SeqCst),
1,
"deadline-monitor startup should route through the host-services seam"
);
}
#[test]
fn native_deadline_monitor_releases_runtime_state_before_warning_callback() {
init_test_logging();
let state = Arc::new(crate::sync::ContendedMutex::new(
"runtime-state",
RuntimeState::new(),
));
{
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let region = guard.create_root_region(Budget::INFINITE);
guard.now = Time::from_secs(100);
let budget = Budget::new().with_deadline(Time::from_secs(110));
let idx = guard.insert_task_with(|idx| {
let task_id = crate::types::TaskId::from_arena(idx);
let mut record = TaskRecord::new_with_time(task_id, region, budget, Time::ZERO);
let mut inner = CxInner::new(region, task_id, budget);
inner.checkpoint_state.last_checkpoint = Some(Time::ZERO);
inner.checkpoint_state.checkpoint_count = 1;
record.set_cx_inner(Arc::new(RwLock::new(inner)));
record
});
let task_id = crate::types::TaskId::from_arena(idx);
guard
.regions
.get(region.arena_index())
.expect("root region exists")
.add_task(task_id)
.expect("task admission succeeds");
}
let (tx, rx) = std::sync::mpsc::channel();
let state_for_handler = Arc::clone(&state);
let mut config = RuntimeConfig::default();
config.thread_name_prefix = "deadline-monitor-test".to_string();
config.deadline_monitor = Some(MonitorConfig {
check_interval: Duration::from_millis(1),
warning_threshold_fraction: 0.2,
checkpoint_timeout: Duration::from_millis(1),
adaptive: AdaptiveDeadlineConfig::default(),
enabled: true,
});
config.deadline_warning_handler = Some(Arc::new(move |_| {
let reacquired = state_for_handler.try_lock().is_ok();
let _ = tx.send(reacquired);
}));
let service = NativeThreadHostServices::start_deadline_monitor(&config, &state);
let reacquired = rx
.recv_timeout(Duration::from_secs(1))
.expect("deadline warning callback should fire");
if let Some(shutdown) = service.shutdown.as_ref() {
shutdown.store(true, Ordering::Relaxed);
}
if let Some(thread) = service.thread {
thread.join().expect("deadline monitor thread should stop");
}
assert!(
reacquired,
"warning callback must run after dropping the runtime-state lock"
);
}
#[test]
fn runtime_handle_spawn_completes_via_scheduler() {
init_test_logging();
let runtime = RuntimeBuilder::new()
.worker_threads(2)
.build()
.expect("runtime build");
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let handle = runtime.handle().spawn(async move {
flag_clone.store(true, Ordering::SeqCst);
42u32
});
let result = runtime.block_on(handle);
assert_eq!(result, 42);
assert!(flag.load(Ordering::SeqCst));
}
#[test]
fn runtime_spawn_blocking_executes_on_pool() {
init_test_logging();
let runtime = RuntimeBuilder::new()
.worker_threads(1)
.blocking_threads(1, 2)
.build()
.expect("runtime build");
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let handle = runtime
.spawn_blocking(move || {
flag_clone.store(true, Ordering::SeqCst);
})
.expect("blocking pool configured");
handle.wait();
assert!(flag.load(Ordering::SeqCst), "blocking task should have run");
}
#[test]
fn runtime_without_blocking_pool_returns_none() {
init_test_logging();
let runtime = RuntimeBuilder::new()
.worker_threads(1)
.blocking_threads(0, 0)
.build()
.expect("runtime build");
let handle = runtime.spawn_blocking(|| {});
assert!(
handle.is_none(),
"spawn_blocking should return None when pool is not configured"
);
assert!(
runtime.blocking_handle().is_none(),
"blocking_handle should return None"
);
}
#[test]
fn runtime_builder_platform_seams_propagate_into_task_contexts() {
init_test_logging();
let io_driver = IoDriverHandle::new(Arc::new(LabReactor::new()));
{
let mut driver = io_driver.lock();
let _ = driver.register_waker(noop_waker());
}
let virtual_clock = Arc::new(crate::time::VirtualClock::starting_at(Time::from_secs(42)));
let timer_driver = TimerDriverHandle::with_virtual_clock(virtual_clock);
let runtime = RuntimeBuilder::current_thread()
.with_io_driver(io_driver)
.with_timer_driver(timer_driver)
.with_entropy_source(Arc::new(crate::util::DetEntropy::new(1234)))
.build()
.expect("runtime build");
let (io_present, timer_now, entropy_source) =
runtime.block_on(runtime.handle().spawn(async {
let cx = Cx::current().expect("task context");
(
cx.io_driver_handle().is_some(),
cx.timer_driver().map(|driver| driver.now()),
cx.entropy().source_id(),
)
}));
assert!(io_present, "injected io driver should be visible in Cx");
assert_eq!(
timer_now,
Some(Time::from_secs(42)),
"injected virtual timer should be visible in Cx"
);
assert_eq!(
entropy_source, "deterministic",
"injected entropy source should flow through Cx"
);
let guard = runtime
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let state_io = guard.io_driver_handle().expect("runtime io driver");
assert_eq!(
state_io.waker_count(),
1,
"runtime should retain the injected io driver instance"
);
let state_timer = guard.timer_driver_handle().expect("runtime timer driver");
assert_eq!(
state_timer.now(),
Time::from_secs(42),
"runtime should retain the injected timer driver instance"
);
drop(guard);
}
#[test]
fn runtime_builder_platform_seams_override_reactor_defaults() {
init_test_logging();
let custom_driver = IoDriverHandle::new(Arc::new(LabReactor::new()));
{
let mut driver = custom_driver.lock();
let _ = driver.register_waker(noop_waker());
}
let custom_timer = TimerDriverHandle::with_virtual_clock(Arc::new(
crate::time::VirtualClock::starting_at(Time::from_secs(7)),
));
let runtime = RuntimeBuilder::current_thread()
.with_reactor(Arc::new(LabReactor::new()))
.with_io_driver(custom_driver)
.with_timer_driver(custom_timer)
.build()
.expect("runtime build");
let guard = runtime
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let io = guard.io_driver_handle().expect("io driver");
assert_eq!(
io.waker_count(),
1,
"explicit io driver should override default reactor wiring"
);
let timer = guard.timer_driver_handle().expect("timer driver");
assert_eq!(
timer.now(),
Time::from_secs(7),
"explicit timer driver should override wall-clock default"
);
drop(guard);
}
#[test]
fn runtime_builder_browser_worker_offload_policy_round_trips() {
init_test_logging();
let runtime = RuntimeBuilder::current_thread()
.browser_worker_offload_enabled(true)
.browser_worker_offload_limits(2048, 4)
.browser_worker_transfer_mode(
crate::runtime::config::WorkerTransferMode::CloneStructured,
)
.browser_worker_cancellation_mode(
crate::runtime::config::WorkerCancellationMode::BestEffortAbort,
)
.build()
.expect("runtime build");
let offload = runtime.config().browser_worker_offload;
assert!(offload.enabled, "offload policy should be enabled");
assert_eq!(
offload.min_task_cost, 2048,
"min task cost should round-trip"
);
assert_eq!(
offload.max_in_flight, 4,
"in-flight limit should round-trip"
);
assert_eq!(
offload.transfer_mode,
crate::runtime::config::WorkerTransferMode::CloneStructured,
"transfer mode should round-trip"
);
assert_eq!(
offload.cancellation_mode,
crate::runtime::config::WorkerCancellationMode::BestEffortAbort,
"cancellation mode should round-trip"
);
}
#[cfg(target_arch = "wasm32")]
#[test]
fn runtime_builder_fail_closes_browser_bootstrap_on_wasm() {
let err = RuntimeBuilder::current_thread()
.build()
.expect_err("public browser bootstrap must fail closed on wasm");
assert_eq!(
err.kind(),
crate::error::ErrorKind::ConfigError,
"unsupported wasm browser bootstrap must surface as ConfigError"
);
let message = err.to_string();
assert!(
message.contains("browser bootstrap")
&& message.contains("RuntimeHostServices seam")
&& message.contains("threadless startup"),
"error should explain why wasm browser bootstrap is still unsupported: {message}"
);
}
#[derive(Debug, PartialEq, Eq)]
struct TraceCounts {
region_created: usize,
spawn: usize,
complete: usize,
timer_scheduled: usize,
timer_fired: usize,
timer_cancelled: usize,
io_requested: usize,
io_ready: usize,
cancel_request: usize,
}
fn parity_counts(events: Vec<TraceEvent>) -> TraceCounts {
let mut counts = TraceCounts {
region_created: 0,
spawn: 0,
complete: 0,
timer_scheduled: 0,
timer_fired: 0,
timer_cancelled: 0,
io_requested: 0,
io_ready: 0,
cancel_request: 0,
};
for event in events {
match event.kind {
TraceEventKind::RegionCreated => counts.region_created += 1,
TraceEventKind::Spawn => counts.spawn += 1,
TraceEventKind::Complete => counts.complete += 1,
TraceEventKind::TimerScheduled => counts.timer_scheduled += 1,
TraceEventKind::TimerFired => counts.timer_fired += 1,
TraceEventKind::TimerCancelled => counts.timer_cancelled += 1,
TraceEventKind::IoRequested => counts.io_requested += 1,
TraceEventKind::IoReady => counts.io_ready += 1,
TraceEventKind::CancelRequest => counts.cancel_request += 1,
_ => {}
}
}
counts
}
fn wait_for_runtime_quiescent(runtime: &Runtime) {
for i in 0..2000 {
let live_tasks = runtime
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.live_task_count();
if live_tasks == 0 {
return;
}
if i < 100 {
std::thread::yield_now();
} else {
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
unreachable!("runtime failed to reach quiescence after waiting");
}
#[cfg(unix)]
struct TestFdSource;
#[cfg(unix)]
impl std::os::fd::AsRawFd for TestFdSource {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
0
}
}
#[test]
fn lab_runtime_matches_prod_trace_for_basic_spawn() {
init_test_logging();
let mut lab = LabRuntime::new(LabConfig::new(7).trace_capacity(1024));
let lab_region = lab.state.create_root_region(Budget::INFINITE);
for _ in 0..2 {
let (task_id, _handle) = lab
.state
.create_task(lab_region, Budget::INFINITE, async { 1_u8 })
.expect("lab task spawn");
lab.scheduler
.lock()
.schedule(task_id, Budget::INFINITE.priority);
lab.run_until_quiescent();
}
let lab_counts = parity_counts(lab.trace().snapshot());
assert_eq!(
lab_counts.spawn, lab_counts.complete,
"lab trace should complete every spawned task"
);
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
for _ in 0..2 {
let handle = runtime.handle().spawn(async { 1_u8 });
let _ = runtime.block_on(handle);
}
wait_for_runtime_quiescent(&runtime);
let runtime_counts = {
let guard = runtime
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
parity_counts(guard.trace.snapshot())
};
assert_eq!(
runtime_counts.spawn, runtime_counts.complete,
"runtime trace should complete every spawned task"
);
assert_eq!(lab_counts, runtime_counts);
}
async fn sleep_once() {
let now = Cx::current()
.and_then(|cx| cx.timer_driver())
.map_or(Time::ZERO, |driver| driver.now());
sleep(now, Duration::from_millis(1)).await;
}
#[test]
#[ignore = "block_on parks thread on Pending; current-thread runtime cannot drive timers"]
fn lab_runtime_matches_prod_trace_for_timer_sleep() {
init_test_logging();
let mut lab = LabRuntime::new(LabConfig::new(7).trace_capacity(1024));
let lab_region = lab.state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = lab
.state
.create_task(lab_region, Budget::INFINITE, sleep_once())
.expect("lab sleep task spawn");
lab.scheduler
.lock()
.schedule(task_id, Budget::INFINITE.priority);
lab.step_for_test(); lab.advance_time(1_000_000);
lab.run_until_quiescent();
let lab_counts = parity_counts(lab.trace().snapshot());
assert!(
lab_counts.timer_scheduled > 0,
"lab trace should record timer scheduling"
);
assert_eq!(
lab_counts.timer_scheduled, lab_counts.timer_fired,
"lab trace should fire every scheduled timer"
);
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
let handle = runtime.handle().spawn(sleep_once());
runtime.block_on(handle);
wait_for_runtime_quiescent(&runtime);
let runtime_counts = {
let guard = runtime
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
parity_counts(guard.trace.snapshot())
};
assert!(
runtime_counts.timer_scheduled > 0,
"runtime trace should record timer scheduling"
);
assert_eq!(
runtime_counts.timer_scheduled, runtime_counts.timer_fired,
"runtime trace should fire every scheduled timer"
);
assert_eq!(lab_counts, runtime_counts);
}
#[test]
fn lab_runtime_matches_prod_trace_for_cancel_request() {
init_test_logging();
let mut lab = LabRuntime::new(LabConfig::new(7).trace_capacity(1024));
let lab_region = lab.state.create_root_region(Budget::INFINITE);
let _ = lab
.state
.create_task(lab_region, Budget::INFINITE, async {
std::future::pending::<()>().await;
})
.expect("lab task spawn");
let _ = lab
.state
.cancel_request(lab_region, &CancelReason::user("stop"), None);
let lab_counts = parity_counts(lab.trace().snapshot());
assert!(
lab_counts.cancel_request > 0,
"lab trace should record cancel request"
);
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
{
let mut guard = runtime
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let region = runtime.inner.root_region;
let _ = guard
.create_task(region, Budget::INFINITE, async {
std::future::pending::<()>().await;
})
.expect("runtime task spawn");
let _ = guard.cancel_request(region, &CancelReason::user("stop"), None);
}
let runtime_counts = {
let guard = runtime
.inner
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
parity_counts(guard.trace.snapshot())
};
assert!(
runtime_counts.cancel_request > 0,
"runtime trace should record cancel request"
);
assert_eq!(lab_counts, runtime_counts);
}
#[cfg(unix)]
#[test]
fn lab_runtime_matches_prod_trace_for_io_ready() {
init_test_logging();
let mut lab = LabRuntime::new(LabConfig::new(7).trace_capacity(1024));
let handle = lab.state.io_driver_handle().expect("lab io driver");
let registration = handle
.register(&TestFdSource, Interest::READABLE, noop_waker())
.expect("lab register source");
let io_key = registration.token();
lab.lab_reactor()
.inject_event(io_key, Event::readable(io_key), Duration::ZERO);
lab.step_for_test();
let lab_counts = parity_counts(lab.trace().snapshot());
assert!(
lab_counts.io_requested > 0,
"lab trace should record io requested"
);
assert_eq!(
lab_counts.io_requested, lab_counts.io_ready,
"lab trace should record ready after request"
);
let reactor = Arc::new(LabReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let state = RuntimeState::with_reactor(reactor_handle);
let driver = state.io_driver_handle().expect("runtime state io driver");
let registration = driver
.register(&TestFdSource, Interest::READABLE, noop_waker())
.expect("runtime state register source");
let io_key = registration.token();
reactor.inject_event(io_key, Event::readable(io_key), Duration::ZERO);
let trace = state.trace_handle();
let now = Time::ZERO;
let mut seen = HashSet::new();
let _ = driver.turn_with(Some(Duration::ZERO), |event, interest| {
let io_key = event.token.0 as u64;
let interest_bits = interest.unwrap_or(event.ready).bits();
if seen.insert(io_key) {
trace.record_event(|seq| TraceEvent::io_requested(seq, now, io_key, interest_bits));
}
trace.record_event(|seq| TraceEvent::io_ready(seq, now, io_key, event.ready.bits()));
});
let runtime_counts = parity_counts(state.trace.snapshot());
assert!(
runtime_counts.io_requested > 0,
"runtime trace should record io requested"
);
assert_eq!(
runtime_counts.io_requested, runtime_counts.io_ready,
"runtime trace should record ready after request"
);
assert_eq!(lab_counts.io_requested, runtime_counts.io_requested);
assert_eq!(lab_counts.io_ready, runtime_counts.io_ready);
}
fn with_clean_env<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
let _guard = crate::test_utils::env_lock();
clean_env_locked();
f()
}
fn with_envs<F, R>(vars: &[(&str, &str)], f: F) -> R
where
F: FnOnce() -> R,
{
with_clean_env(|| {
for (k, v) in vars {
unsafe { std::env::set_var(k, v) };
}
let result = f();
for (k, _) in vars {
unsafe { std::env::remove_var(k) };
}
result
})
}
fn clean_env_locked() {
use crate::runtime::env_config::*;
for var in &[
ENV_WORKER_THREADS,
ENV_TASK_QUEUE_DEPTH,
ENV_THREAD_STACK_SIZE,
ENV_THREAD_NAME_PREFIX,
ENV_STEAL_BATCH_SIZE,
ENV_CANCEL_LANE_MAX_STREAK,
ENV_ENABLE_GOVERNOR,
ENV_GOVERNOR_INTERVAL,
ENV_ENABLE_ADAPTIVE_CANCEL_STREAK,
ENV_ADAPTIVE_CANCEL_EPOCH_STEPS,
ENV_BLOCKING_MIN_THREADS,
ENV_BLOCKING_MAX_THREADS,
ENV_ENABLE_PARKING,
ENV_POLL_BUDGET,
] {
unsafe { std::env::remove_var(var) };
}
}
#[test]
fn with_env_overrides_applies_env_vars() {
use crate::runtime::env_config::*;
with_envs(
&[(ENV_WORKER_THREADS, "4"), (ENV_POLL_BUDGET, "64")],
|| {
let runtime = RuntimeBuilder::new()
.with_env_overrides()
.expect("env overrides")
.build()
.expect("runtime build");
assert_eq!(runtime.config().worker_threads, 4);
assert_eq!(runtime.config().poll_budget, 64);
},
);
}
#[test]
fn programmatic_overrides_env_vars() {
use crate::runtime::env_config::*;
with_envs(&[(ENV_WORKER_THREADS, "8")], || {
let runtime = RuntimeBuilder::new()
.with_env_overrides()
.expect("env overrides")
.worker_threads(2)
.build()
.expect("runtime build");
assert_eq!(runtime.config().worker_threads, 2);
});
}
#[test]
fn with_env_overrides_invalid_var_returns_error() {
use crate::runtime::env_config::*;
with_envs(&[(ENV_WORKER_THREADS, "not_a_number")], || {
let result = RuntimeBuilder::new().with_env_overrides();
assert!(result.is_err());
});
}
#[test]
fn with_env_overrides_no_vars_uses_defaults() {
with_clean_env(|| {
let defaults = RuntimeConfig::default();
let runtime = RuntimeBuilder::new()
.with_env_overrides()
.expect("env overrides")
.build()
.expect("runtime build");
assert_eq!(
runtime.config().cancel_lane_max_streak,
defaults.cancel_lane_max_streak
);
assert_eq!(runtime.config().enable_governor, defaults.enable_governor);
assert_eq!(
runtime.config().governor_interval,
defaults.governor_interval
);
assert_eq!(
runtime.config().enable_adaptive_cancel_streak,
defaults.enable_adaptive_cancel_streak
);
assert_eq!(
runtime.config().adaptive_cancel_streak_epoch_steps,
defaults.adaptive_cancel_streak_epoch_steps
);
assert_eq!(runtime.config().poll_budget, defaults.poll_budget);
});
}
#[test]
fn with_env_overrides_applies_governor_settings() {
use crate::runtime::env_config::*;
with_envs(
&[(ENV_ENABLE_GOVERNOR, "true"), (ENV_GOVERNOR_INTERVAL, "41")],
|| {
let runtime = RuntimeBuilder::new()
.with_env_overrides()
.expect("env overrides")
.build()
.expect("runtime build");
assert!(runtime.config().enable_governor);
assert_eq!(runtime.config().governor_interval, 41);
},
);
}
#[cfg(feature = "config-file")]
#[test]
fn from_toml_str_builds_runtime() {
let toml = r"
[scheduler]
worker_threads = 2
poll_budget = 32
";
let runtime = RuntimeBuilder::from_toml_str(toml)
.expect("from_toml_str")
.build()
.expect("runtime build");
assert_eq!(runtime.config().worker_threads, 2);
assert_eq!(runtime.config().poll_budget, 32);
}
#[cfg(feature = "config-file")]
#[test]
fn from_toml_str_applies_governor_settings() {
let toml = r"
[scheduler]
enable_governor = true
governor_interval = 80
";
let runtime = RuntimeBuilder::from_toml_str(toml)
.expect("from_toml_str")
.build()
.expect("runtime build");
assert!(runtime.config().enable_governor);
assert_eq!(runtime.config().governor_interval, 80);
}
#[cfg(feature = "config-file")]
#[test]
fn from_toml_str_with_programmatic_override() {
let toml = r"
[scheduler]
worker_threads = 8
";
let runtime = RuntimeBuilder::from_toml_str(toml)
.expect("from_toml_str")
.worker_threads(2) .build()
.expect("runtime build");
assert_eq!(runtime.config().worker_threads, 2);
}
#[cfg(feature = "config-file")]
#[test]
fn from_toml_str_invalid_returns_error() {
let result = RuntimeBuilder::from_toml_str("not valid {{{{");
assert!(result.is_err());
}
#[cfg(feature = "config-file")]
#[test]
fn precedence_programmatic_over_env_over_toml() {
use crate::runtime::env_config::*;
with_envs(&[(ENV_WORKER_THREADS, "8")], || {
let toml = r"
[scheduler]
worker_threads = 16
";
let runtime = RuntimeBuilder::from_toml_str(toml)
.expect("from_toml_str")
.with_env_overrides()
.expect("env overrides")
.worker_threads(2) .build()
.expect("runtime build");
assert_eq!(runtime.config().worker_threads, 2);
});
}
#[cfg(feature = "config-file")]
#[test]
fn precedence_env_over_toml() {
use crate::runtime::env_config::*;
with_envs(&[(ENV_WORKER_THREADS, "8")], || {
let toml = r"
[scheduler]
worker_threads = 16
";
let runtime = RuntimeBuilder::from_toml_str(toml)
.expect("from_toml_str")
.with_env_overrides()
.expect("env overrides")
.build()
.expect("runtime build");
assert_eq!(runtime.config().worker_threads, 8);
});
}
#[test]
fn current_handle_available_inside_block_on() {
init_test_logging();
let runtime = RuntimeBuilder::new()
.worker_threads(1)
.build()
.expect("runtime build");
runtime.block_on(async {
let handle = Runtime::current_handle();
assert!(
handle.is_some(),
"current_handle should be Some inside block_on"
);
});
}
#[test]
fn current_handle_none_outside_block_on() {
init_test_logging();
assert!(
Runtime::current_handle().is_none(),
"current_handle should be None outside block_on"
);
}
#[test]
fn current_handle_spawn_completes_on_scheduler() {
init_test_logging();
let runtime = RuntimeBuilder::new()
.worker_threads(2)
.build()
.expect("runtime build");
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let result = runtime.block_on(async move {
let handle = Runtime::current_handle().expect("inside block_on");
let join = handle.spawn(async move {
flag_clone.store(true, Ordering::SeqCst);
99u32
});
join.await
});
assert_eq!(result, 99);
assert!(flag.load(Ordering::SeqCst), "spawned task should have run");
}
#[test]
fn current_handle_available_inside_spawned_task() {
init_test_logging();
let runtime = RuntimeBuilder::new()
.worker_threads(2)
.build()
.expect("runtime build");
let outer = runtime.handle().spawn(async {
let handle = Runtime::current_handle().expect("spawned task should see runtime handle");
handle.spawn(async { 42u32 }).await
});
assert_eq!(runtime.block_on(outer), 42);
}
#[test]
fn current_handle_restored_after_block_on() {
init_test_logging();
assert!(Runtime::current_handle().is_none());
let runtime = RuntimeBuilder::new()
.worker_threads(1)
.build()
.expect("runtime build");
runtime.block_on(async {
assert!(Runtime::current_handle().is_some());
});
assert!(Runtime::current_handle().is_none());
}
#[test]
fn current_handle_returns_none_during_thread_local_teardown() {
init_test_logging();
CURRENT_HANDLE_DTOR_STATE.store(0, Ordering::SeqCst);
let join = std::thread::spawn(|| {
CURRENT_HANDLE_DTOR_PROBE.with(|_| {});
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
runtime.block_on(async {
assert!(
Runtime::current_handle().is_some(),
"runtime handle should be installed inside block_on"
);
});
});
join.join()
.expect("thread-local teardown should not panic when reading runtime handle");
assert_eq!(
CURRENT_HANDLE_DTOR_STATE.load(Ordering::SeqCst),
3,
"Runtime::current_handle() should fail closed once TLS is unavailable"
);
}
#[test]
fn weak_current_handle_try_spawn_returns_runtime_unavailable_after_drop() {
init_test_logging();
let runtime = RuntimeBuilder::new()
.worker_threads(1)
.build()
.expect("runtime build");
let weak_handle = runtime.block_on(runtime.handle().spawn(async {
Runtime::current_handle().expect("spawned task should see runtime handle")
}));
assert!(
matches!(weak_handle.inner, RuntimeHandleRef::Weak(_)),
"worker-thread current_handle should remain weak to avoid runtime cycles"
);
drop(runtime);
let result = weak_handle.try_spawn(async { 42u8 });
assert!(
matches!(result, Err(SpawnError::RuntimeUnavailable)),
"stale weak handle should return RuntimeUnavailable instead of panicking"
);
assert!(
weak_handle.spawn_blocking(|| {}).is_none(),
"stale weak handle should not expose a blocking pool"
);
assert!(
weak_handle.blocking_handle().is_none(),
"stale weak handle should not yield a blocking handle"
);
}
#[test]
fn thread_callbacks_do_not_fire_for_block_on_caller() {
init_test_logging();
let started = Arc::new(AtomicUsize::new(0));
let stopped = Arc::new(AtomicUsize::new(0));
let started_for_callback = Arc::clone(&started);
let stopped_for_callback = Arc::clone(&stopped);
let runtime = RuntimeBuilder::new()
.worker_threads(1)
.on_thread_start(move || {
started_for_callback.fetch_add(1, Ordering::SeqCst);
})
.on_thread_stop(move || {
stopped_for_callback.fetch_add(1, Ordering::SeqCst);
})
.build()
.expect("runtime build");
let join = runtime.handle().spawn(async { 7u8 });
assert_eq!(runtime.block_on(join), 7);
assert_eq!(
started.load(Ordering::SeqCst),
1,
"only the worker thread should trigger on_thread_start"
);
drop(runtime);
assert_eq!(
stopped.load(Ordering::SeqCst),
1,
"only the worker thread should trigger on_thread_stop"
);
}
#[test]
fn join_handle_second_poll_panics_after_success_and_stays_finished() {
init_test_logging();
let state = Arc::new(Mutex::new(JoinState::new()));
complete_task(&state, Ok(7_u8));
let mut join = std::pin::pin!(JoinHandle::new(Arc::clone(&state)));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = join.as_mut().poll(&mut cx);
assert!(matches!(first, Poll::Ready(7)));
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should remain finished after consuming the result"
);
let second = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = join.as_mut().poll(&mut cx);
}));
let message =
panic_payload_to_string(second.expect_err("second poll must fail closed by panicking"));
assert!(
message.contains("runtime::JoinHandle polled after completion"),
"second poll should panic with completion misuse message, got {message}"
);
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should remain finished after post-completion misuse"
);
}
#[test]
fn join_handle_pending_then_completion_then_repoll_panics_and_stays_finished() {
init_test_logging();
let state = Arc::new(Mutex::new(JoinState::new()));
let mut join = std::pin::pin!(JoinHandle::new(Arc::clone(&state)));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = join.as_mut().poll(&mut cx);
assert!(matches!(first, Poll::Pending));
assert!(
!join.as_ref().get_ref().is_finished(),
"join handle should not be finished while task is still pending"
);
complete_task(&state, Ok(11_u8));
let second = join.as_mut().poll(&mut cx);
assert!(matches!(second, Poll::Ready(11)));
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should become finished after ready output is observed"
);
let third = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = join.as_mut().poll(&mut cx);
}));
let message =
panic_payload_to_string(third.expect_err("third poll must fail closed by panicking"));
assert!(
message.contains("runtime::JoinHandle polled after completion"),
"post-completion repoll should panic with completion misuse message, got {message}"
);
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should remain finished after post-completion misuse"
);
}
#[test]
fn join_handle_second_poll_panics_after_task_panic_and_stays_finished() {
init_test_logging();
let state = Arc::new(Mutex::new(JoinState::<u8>::new()));
complete_task(&state, Err(Box::new("join-handle boom")));
let mut join = std::pin::pin!(JoinHandle::new(Arc::clone(&state)));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = join.as_mut().poll(&mut cx);
}));
let first_message =
panic_payload_to_string(first.expect_err("first poll should resume the task panic"));
assert!(
first_message.contains("join-handle boom"),
"first poll should preserve the original task panic, got {first_message}"
);
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should remain finished after propagating a task panic"
);
let second = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = join.as_mut().poll(&mut cx);
}));
let second_message =
panic_payload_to_string(second.expect_err("second poll must fail closed by panicking"));
assert!(
second_message.contains("runtime::JoinHandle polled after completion"),
"second poll should panic with completion misuse message, got {second_message}"
);
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should remain finished after post-completion misuse"
);
}
#[test]
fn join_handle_is_finished_after_executor_side_disappears() {
init_test_logging();
let state = Arc::new(Mutex::new(JoinState::<u8>::new()));
let mut join = std::pin::pin!(JoinHandle::new(Arc::clone(&state)));
drop(state);
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should report terminal dropped-task state as finished"
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let poll_after_drop = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = join.as_mut().poll(&mut cx);
}));
let message = panic_payload_to_string(
poll_after_drop.expect_err("poll after executor-side disappearance must panic"),
);
assert!(
message.contains("task was dropped or cancelled before completion"),
"poll after executor-side disappearance should preserve dropped-task panic, got {message}"
);
assert!(
join.as_ref().get_ref().is_finished(),
"join handle should remain finished after the terminal dropped-task poll"
);
}
}