use crate::observability::ObservabilityConfig;
use crate::observability::metrics::{MetricsProvider, NoOpMetrics};
use crate::record::RegionLimits;
use crate::runtime::deadline_monitor::{DeadlineWarning, MonitorConfig};
use crate::trace::distributed::LogicalClockMode;
use crate::types::CancelAttributionConfig;
use std::sync::Arc;
#[derive(Clone, Default)]
pub struct BlockingPoolConfig {
pub min_threads: usize,
pub max_threads: usize,
}
impl BlockingPoolConfig {
pub fn normalize(&mut self) {
if self.max_threads < self.min_threads {
self.max_threads = self.min_threads;
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerTransferMode {
CloneStructured,
TransferableOnly,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerCancellationMode {
BestEffortAbort,
RequireAck,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserWorkerOffloadConfig {
pub enabled: bool,
pub min_task_cost: u32,
pub max_in_flight: usize,
pub transfer_mode: WorkerTransferMode,
pub cancellation_mode: WorkerCancellationMode,
pub require_owned_payloads: bool,
}
impl BrowserWorkerOffloadConfig {
pub fn normalize(&mut self) {
if self.min_task_cost == 0 {
self.min_task_cost = 1;
}
if self.max_in_flight == 0 {
self.max_in_flight = 1;
}
}
}
impl Default for BrowserWorkerOffloadConfig {
fn default() -> Self {
Self {
enabled: false,
min_task_cost: 1024,
max_in_flight: 16,
transfer_mode: WorkerTransferMode::TransferableOnly,
cancellation_mode: WorkerCancellationMode::RequireAck,
require_owned_payloads: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ObligationLeakResponse {
Panic,
Log,
Silent,
Recover,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LeakEscalation {
pub threshold: u64,
pub escalate_to: ObligationLeakResponse,
}
impl LeakEscalation {
#[inline]
#[must_use]
pub const fn new(threshold: u64, escalate_to: ObligationLeakResponse) -> Self {
let threshold = if threshold == 0 { 1 } else { threshold };
Self {
threshold,
escalate_to,
}
}
}
#[derive(Clone)]
pub struct RuntimeConfig {
pub worker_threads: usize,
pub thread_stack_size: usize,
pub thread_name_prefix: String,
pub global_queue_limit: usize,
pub steal_batch_size: usize,
pub blocking: BlockingPoolConfig,
pub enable_parking: bool,
pub poll_budget: u32,
pub browser_ready_handoff_limit: usize,
pub browser_worker_offload: BrowserWorkerOffloadConfig,
pub cancel_lane_max_streak: usize,
pub logical_clock_mode: Option<LogicalClockMode>,
pub root_region_limits: Option<RegionLimits>,
pub on_thread_start: Option<Arc<dyn Fn() + Send + Sync>>,
pub on_thread_stop: Option<Arc<dyn Fn() + Send + Sync>>,
pub deadline_monitor: Option<MonitorConfig>,
pub deadline_warning_handler: Option<Arc<dyn Fn(DeadlineWarning) + Send + Sync>>,
pub metrics_provider: Arc<dyn MetricsProvider>,
pub observability: Option<ObservabilityConfig>,
pub cancel_attribution: CancelAttributionConfig,
pub obligation_leak_response: ObligationLeakResponse,
pub leak_escalation: Option<LeakEscalation>,
pub enable_governor: bool,
pub governor_interval: u32,
pub enable_adaptive_cancel_streak: bool,
pub adaptive_cancel_streak_epoch_steps: u32,
}
impl RuntimeConfig {
pub fn normalize(&mut self) {
if self.worker_threads == 0 {
self.worker_threads = 1;
}
if self.thread_stack_size == 0 {
self.thread_stack_size = 2 * 1024 * 1024;
}
if self.steal_batch_size == 0 {
self.steal_batch_size = 1;
}
if self.poll_budget == 0 {
self.poll_budget = 1;
}
if self.cancel_lane_max_streak == 0 {
self.cancel_lane_max_streak = 1;
}
if self.governor_interval == 0 {
self.governor_interval = 1;
}
if self.adaptive_cancel_streak_epoch_steps == 0 {
self.adaptive_cancel_streak_epoch_steps = 1;
}
self.browser_worker_offload.normalize();
if let Some(escalation) = self.leak_escalation.as_mut() {
if escalation.threshold == 0 {
escalation.threshold = 1;
}
}
if self.thread_name_prefix.is_empty() {
self.thread_name_prefix = "asupersync-worker".to_string();
}
self.blocking.normalize();
}
pub(crate) fn default_worker_threads() -> usize {
std::thread::available_parallelism()
.map_or(1, std::num::NonZeroUsize::get)
.max(1)
}
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
worker_threads: Self::default_worker_threads(),
thread_stack_size: 2 * 1024 * 1024,
thread_name_prefix: "asupersync-worker".to_string(),
global_queue_limit: 0,
steal_batch_size: 16,
blocking: BlockingPoolConfig::default(),
enable_parking: true,
poll_budget: 128,
browser_ready_handoff_limit: 0,
browser_worker_offload: BrowserWorkerOffloadConfig::default(),
cancel_lane_max_streak: 16,
logical_clock_mode: None,
root_region_limits: None,
on_thread_start: None,
on_thread_stop: None,
deadline_monitor: None,
deadline_warning_handler: None,
metrics_provider: Arc::new(NoOpMetrics),
observability: None,
cancel_attribution: CancelAttributionConfig::default(),
obligation_leak_response: ObligationLeakResponse::Log,
leak_escalation: None,
enable_governor: false,
governor_interval: 32,
enable_adaptive_cancel_streak: true,
adaptive_cancel_streak_epoch_steps: 128,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn test_default_config_sane() {
init_test("test_default_config_sane");
let config = RuntimeConfig::default();
crate::assert_with_log!(
config.worker_threads >= 1,
"worker_threads",
true,
config.worker_threads >= 1
);
crate::assert_with_log!(
config.thread_stack_size == 2 * 1024 * 1024,
"thread_stack_size",
2 * 1024 * 1024,
config.thread_stack_size
);
crate::assert_with_log!(
!config.thread_name_prefix.is_empty(),
"thread_name_prefix",
true,
!config.thread_name_prefix.is_empty()
);
crate::assert_with_log!(
config.poll_budget == 128,
"poll_budget",
128,
config.poll_budget
);
crate::assert_with_log!(
config.browser_ready_handoff_limit == 0,
"browser_ready_handoff_limit",
0,
config.browser_ready_handoff_limit
);
crate::assert_with_log!(
!config.browser_worker_offload.enabled,
"browser_worker_offload.enabled",
false,
config.browser_worker_offload.enabled
);
crate::assert_with_log!(
config.browser_worker_offload.min_task_cost == 1024,
"browser_worker_offload.min_task_cost",
1024,
config.browser_worker_offload.min_task_cost
);
crate::assert_with_log!(
config.browser_worker_offload.max_in_flight == 16,
"browser_worker_offload.max_in_flight",
16,
config.browser_worker_offload.max_in_flight
);
crate::assert_with_log!(
config.cancel_lane_max_streak == 16,
"cancel_lane_max_streak",
16,
config.cancel_lane_max_streak
);
crate::assert_with_log!(
config.enable_adaptive_cancel_streak,
"enable_adaptive_cancel_streak",
true,
config.enable_adaptive_cancel_streak
);
crate::assert_with_log!(
config.adaptive_cancel_streak_epoch_steps == 128,
"adaptive_cancel_streak_epoch_steps",
128,
config.adaptive_cancel_streak_epoch_steps
);
crate::assert_with_log!(
config.logical_clock_mode.is_none(),
"logical_clock_mode",
"None",
format!("{:?}", config.logical_clock_mode)
);
crate::assert_with_log!(
config.obligation_leak_response == ObligationLeakResponse::Log,
"obligation_leak_response",
ObligationLeakResponse::Log,
config.obligation_leak_response
);
crate::assert_with_log!(
config.cancel_attribution == CancelAttributionConfig::default(),
"cancel_attribution default",
CancelAttributionConfig::default(),
config.cancel_attribution
);
crate::test_complete!("test_default_config_sane");
}
fn zero_minimums_config() -> RuntimeConfig {
RuntimeConfig {
worker_threads: 0,
thread_stack_size: 0,
thread_name_prefix: String::new(),
global_queue_limit: 0,
steal_batch_size: 0,
blocking: BlockingPoolConfig {
min_threads: 4,
max_threads: 1,
},
enable_parking: true,
poll_budget: 0,
browser_ready_handoff_limit: 0,
browser_worker_offload: BrowserWorkerOffloadConfig {
enabled: true,
min_task_cost: 0,
max_in_flight: 0,
transfer_mode: WorkerTransferMode::CloneStructured,
cancellation_mode: WorkerCancellationMode::BestEffortAbort,
require_owned_payloads: false,
},
cancel_lane_max_streak: 0,
root_region_limits: None,
on_thread_start: None,
on_thread_stop: None,
deadline_monitor: None,
deadline_warning_handler: None,
metrics_provider: Arc::new(NoOpMetrics),
observability: None,
cancel_attribution: CancelAttributionConfig::new(1, 256),
obligation_leak_response: ObligationLeakResponse::Log,
leak_escalation: None,
logical_clock_mode: None,
enable_governor: false,
governor_interval: 0,
enable_adaptive_cancel_streak: false,
adaptive_cancel_streak_epoch_steps: 0,
}
}
fn assert_normalized_minimums(config: &RuntimeConfig) {
crate::assert_with_log!(
config.worker_threads == 1,
"worker_threads",
1,
config.worker_threads
);
crate::assert_with_log!(
config.thread_stack_size == 2 * 1024 * 1024,
"thread_stack_size",
2 * 1024 * 1024,
config.thread_stack_size
);
crate::assert_with_log!(
config.steal_batch_size == 1,
"steal_batch_size",
1,
config.steal_batch_size
);
crate::assert_with_log!(
config.poll_budget == 1,
"poll_budget",
1,
config.poll_budget
);
crate::assert_with_log!(
config.browser_ready_handoff_limit == 0,
"browser_ready_handoff_limit",
0,
config.browser_ready_handoff_limit
);
crate::assert_with_log!(
config.browser_worker_offload.min_task_cost == 1,
"browser_worker_offload.min_task_cost",
1,
config.browser_worker_offload.min_task_cost
);
crate::assert_with_log!(
config.browser_worker_offload.max_in_flight == 1,
"browser_worker_offload.max_in_flight",
1,
config.browser_worker_offload.max_in_flight
);
crate::assert_with_log!(
config.cancel_lane_max_streak == 1,
"cancel_lane_max_streak",
1,
config.cancel_lane_max_streak
);
crate::assert_with_log!(
config.governor_interval == 1,
"governor_interval",
1,
config.governor_interval
);
crate::assert_with_log!(
!config.enable_adaptive_cancel_streak,
"enable_adaptive_cancel_streak",
false,
config.enable_adaptive_cancel_streak
);
crate::assert_with_log!(
config.adaptive_cancel_streak_epoch_steps == 1,
"adaptive_cancel_streak_epoch_steps",
1,
config.adaptive_cancel_streak_epoch_steps
);
crate::assert_with_log!(
config.thread_name_prefix == "asupersync-worker",
"thread_name_prefix",
"asupersync-worker",
config.thread_name_prefix
);
crate::assert_with_log!(
config.blocking.max_threads == config.blocking.min_threads,
"blocking normalize",
config.blocking.min_threads,
config.blocking.max_threads
);
}
#[test]
fn test_normalize_enforces_minimums() {
init_test("test_normalize_enforces_minimums");
let mut config = zero_minimums_config();
config.normalize();
assert_normalized_minimums(&config);
crate::test_complete!("test_normalize_enforces_minimums");
}
#[test]
fn test_blocking_pool_normalize() {
init_test("test_blocking_pool_normalize");
let mut blocking = BlockingPoolConfig {
min_threads: 2,
max_threads: 1,
};
blocking.normalize();
crate::assert_with_log!(
blocking.max_threads == blocking.min_threads,
"blocking max>=min",
blocking.min_threads,
blocking.max_threads
);
crate::test_complete!("test_blocking_pool_normalize");
}
#[test]
fn test_leak_escalation_new_clamps_zero_threshold() {
init_test("test_leak_escalation_new_clamps_zero_threshold");
let escalation = LeakEscalation::new(0, ObligationLeakResponse::Panic);
crate::assert_with_log!(
escalation.threshold == 1,
"leak_escalation.threshold",
1,
escalation.threshold
);
crate::assert_with_log!(
escalation.escalate_to == ObligationLeakResponse::Panic,
"leak_escalation.escalate_to",
ObligationLeakResponse::Panic,
escalation.escalate_to
);
crate::test_complete!("test_leak_escalation_new_clamps_zero_threshold");
}
#[test]
fn test_normalize_clamps_zero_leak_escalation_threshold() {
init_test("test_normalize_clamps_zero_leak_escalation_threshold");
let mut config = RuntimeConfig {
leak_escalation: Some(LeakEscalation {
threshold: 0,
escalate_to: ObligationLeakResponse::Recover,
}),
..RuntimeConfig::default()
};
config.normalize();
let escalation = config
.leak_escalation
.expect("leak escalation should remain configured");
crate::assert_with_log!(
escalation.threshold == 1,
"leak_escalation.threshold",
1,
escalation.threshold
);
crate::assert_with_log!(
escalation.escalate_to == ObligationLeakResponse::Recover,
"leak_escalation.escalate_to",
ObligationLeakResponse::Recover,
escalation.escalate_to
);
crate::test_complete!("test_normalize_clamps_zero_leak_escalation_threshold");
}
#[test]
fn test_default_worker_threads_nonzero() {
init_test("test_default_worker_threads_nonzero");
let threads = RuntimeConfig::default_worker_threads();
crate::assert_with_log!(threads >= 1, "default_worker_threads", true, threads >= 1);
crate::test_complete!("test_default_worker_threads_nonzero");
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_normalize_preserves_custom_values() {
init_test("test_normalize_preserves_custom_values");
let mut config = RuntimeConfig {
worker_threads: 4,
thread_stack_size: 1024,
thread_name_prefix: "custom".to_string(),
global_queue_limit: 64,
steal_batch_size: 8,
blocking: BlockingPoolConfig {
min_threads: 2,
max_threads: 4,
},
enable_parking: false,
poll_budget: 32,
browser_ready_handoff_limit: 64,
browser_worker_offload: BrowserWorkerOffloadConfig {
enabled: true,
min_task_cost: 4096,
max_in_flight: 8,
transfer_mode: WorkerTransferMode::TransferableOnly,
cancellation_mode: WorkerCancellationMode::RequireAck,
require_owned_payloads: true,
},
cancel_lane_max_streak: 16,
root_region_limits: None,
on_thread_start: None,
on_thread_stop: None,
deadline_monitor: None,
deadline_warning_handler: None,
metrics_provider: Arc::new(NoOpMetrics),
observability: None,
cancel_attribution: CancelAttributionConfig::new(8, 1024),
obligation_leak_response: ObligationLeakResponse::Silent,
leak_escalation: None,
logical_clock_mode: None,
enable_governor: false,
governor_interval: 7,
enable_adaptive_cancel_streak: true,
adaptive_cancel_streak_epoch_steps: 64,
};
config.normalize();
crate::assert_with_log!(
config.worker_threads == 4,
"worker_threads",
4,
config.worker_threads
);
crate::assert_with_log!(
config.thread_stack_size == 1024,
"thread_stack_size",
1024,
config.thread_stack_size
);
crate::assert_with_log!(
config.thread_name_prefix == "custom",
"thread_name_prefix",
"custom",
config.thread_name_prefix
);
crate::assert_with_log!(
config.steal_batch_size == 8,
"steal_batch_size",
8,
config.steal_batch_size
);
crate::assert_with_log!(
config.poll_budget == 32,
"poll_budget",
32,
config.poll_budget
);
crate::assert_with_log!(
config.browser_ready_handoff_limit == 64,
"browser_ready_handoff_limit",
64,
config.browser_ready_handoff_limit
);
crate::assert_with_log!(
config.browser_worker_offload.enabled,
"browser_worker_offload.enabled",
true,
config.browser_worker_offload.enabled
);
crate::assert_with_log!(
config.browser_worker_offload.min_task_cost == 4096,
"browser_worker_offload.min_task_cost",
4096,
config.browser_worker_offload.min_task_cost
);
crate::assert_with_log!(
config.browser_worker_offload.max_in_flight == 8,
"browser_worker_offload.max_in_flight",
8,
config.browser_worker_offload.max_in_flight
);
crate::assert_with_log!(
config.cancel_lane_max_streak == 16,
"cancel_lane_max_streak",
16,
config.cancel_lane_max_streak
);
crate::assert_with_log!(
config.governor_interval == 7,
"governor_interval",
7,
config.governor_interval
);
crate::assert_with_log!(
config.enable_adaptive_cancel_streak,
"enable_adaptive_cancel_streak",
true,
config.enable_adaptive_cancel_streak
);
crate::assert_with_log!(
config.adaptive_cancel_streak_epoch_steps == 64,
"adaptive_cancel_streak_epoch_steps",
64,
config.adaptive_cancel_streak_epoch_steps
);
crate::assert_with_log!(
config.blocking.max_threads == 4,
"blocking max",
4,
config.blocking.max_threads
);
crate::assert_with_log!(
config.obligation_leak_response == ObligationLeakResponse::Silent,
"obligation_leak_response",
ObligationLeakResponse::Silent,
config.obligation_leak_response
);
crate::test_complete!("test_normalize_preserves_custom_values");
}
#[test]
fn test_browser_worker_offload_defaults() {
init_test("test_browser_worker_offload_defaults");
let cfg = BrowserWorkerOffloadConfig::default();
crate::assert_with_log!(
!cfg.enabled,
"offload disabled by default",
false,
cfg.enabled
);
crate::assert_with_log!(
cfg.min_task_cost == 1024,
"default min task cost",
1024,
cfg.min_task_cost
);
crate::assert_with_log!(
cfg.max_in_flight == 16,
"default max in flight",
16,
cfg.max_in_flight
);
crate::assert_with_log!(
cfg.transfer_mode == WorkerTransferMode::TransferableOnly,
"default transfer mode",
WorkerTransferMode::TransferableOnly,
cfg.transfer_mode
);
crate::assert_with_log!(
cfg.cancellation_mode == WorkerCancellationMode::RequireAck,
"default cancellation mode",
WorkerCancellationMode::RequireAck,
cfg.cancellation_mode
);
crate::assert_with_log!(
cfg.require_owned_payloads,
"default require_owned_payloads",
true,
cfg.require_owned_payloads
);
crate::test_complete!("test_browser_worker_offload_defaults");
}
#[test]
fn test_browser_worker_offload_normalize_clamps_zero_values() {
init_test("test_browser_worker_offload_normalize_clamps_zero_values");
let mut cfg = BrowserWorkerOffloadConfig {
enabled: true,
min_task_cost: 0,
max_in_flight: 0,
transfer_mode: WorkerTransferMode::CloneStructured,
cancellation_mode: WorkerCancellationMode::BestEffortAbort,
require_owned_payloads: false,
};
cfg.normalize();
crate::assert_with_log!(
cfg.min_task_cost == 1,
"min_task_cost",
1,
cfg.min_task_cost
);
crate::assert_with_log!(
cfg.max_in_flight == 1,
"max_in_flight",
1,
cfg.max_in_flight
);
crate::test_complete!("test_browser_worker_offload_normalize_clamps_zero_values");
}
#[test]
fn obligation_leak_response_clone_copy() {
let a = ObligationLeakResponse::Recover;
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
}
#[test]
fn leak_escalation_debug_eq() {
let a = LeakEscalation::new(5, ObligationLeakResponse::Panic);
let b = LeakEscalation::new(5, ObligationLeakResponse::Panic);
assert_eq!(a, b);
let dbg = format!("{a:?}");
assert!(dbg.contains("LeakEscalation"), "{dbg}");
}
#[test]
fn leak_escalation_clone_copy() {
let a = LeakEscalation::new(10, ObligationLeakResponse::Log);
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
}
#[test]
fn blocking_pool_config_default() {
let bp = BlockingPoolConfig::default();
assert_eq!(bp.min_threads, 0);
assert_eq!(bp.max_threads, 0);
}
#[test]
fn blocking_pool_config_clone() {
let bp = BlockingPoolConfig {
min_threads: 2,
max_threads: 8,
};
let cloned = bp;
assert_eq!(cloned.min_threads, 2);
assert_eq!(cloned.max_threads, 8);
}
#[test]
fn runtime_config_clone() {
let config = RuntimeConfig::default();
let cloned = config.clone();
assert_eq!(cloned.worker_threads, config.worker_threads);
assert_eq!(cloned.poll_budget, config.poll_budget);
assert_eq!(
cloned.obligation_leak_response,
config.obligation_leak_response
);
}
#[test]
fn test_obligation_leak_response_variants() {
init_test("test_obligation_leak_response_variants");
let variants = [
ObligationLeakResponse::Panic,
ObligationLeakResponse::Log,
ObligationLeakResponse::Silent,
ObligationLeakResponse::Recover,
];
for (i, a) in variants.iter().enumerate() {
for (j, b) in variants.iter().enumerate() {
if i == j {
crate::assert_with_log!(*a == *b, "same variant eq", true, *a == *b);
} else {
crate::assert_with_log!(*a != *b, "diff variant ne", true, *a != *b);
}
}
let dbg = format!("{a:?}");
crate::assert_with_log!(!dbg.is_empty(), "Debug non-empty", true, !dbg.is_empty());
}
crate::test_complete!("test_obligation_leak_response_variants");
}
#[test]
fn test_leak_escalation_preserves_nonzero() {
init_test("test_leak_escalation_preserves_nonzero");
let escalation = LeakEscalation::new(10, ObligationLeakResponse::Recover);
crate::assert_with_log!(
escalation.threshold == 10,
"threshold preserved",
10,
escalation.threshold
);
crate::assert_with_log!(
escalation.escalate_to == ObligationLeakResponse::Recover,
"escalate_to",
ObligationLeakResponse::Recover,
escalation.escalate_to
);
crate::test_complete!("test_leak_escalation_preserves_nonzero");
}
#[test]
fn test_default_governor_settings() {
init_test("test_default_governor_settings");
let config = RuntimeConfig::default();
crate::assert_with_log!(
!config.enable_governor,
"governor disabled by default",
false,
config.enable_governor
);
crate::assert_with_log!(
config.governor_interval == 32,
"default governor interval",
32,
config.governor_interval
);
crate::assert_with_log!(
config.enable_adaptive_cancel_streak,
"adaptive cancel streak enabled by default",
true,
config.enable_adaptive_cancel_streak
);
crate::assert_with_log!(
config.adaptive_cancel_streak_epoch_steps == 128,
"adaptive cancel streak default epoch",
128,
config.adaptive_cancel_streak_epoch_steps
);
crate::test_complete!("test_default_governor_settings");
}
}