use super::*;
#[cfg(all(target_os = "linux", target_env = "gnu", not(test)))]
unsafe extern "C" {
fn malloc_trim(pad: usize) -> i32;
}
pub(super) fn runtime_proxy_log_dir() -> PathBuf {
env::var_os("PRODEX_RUNTIME_LOG_DIR")
.filter(|value| !value.is_empty())
.map(PathBuf::from)
.or_else(|| runtime_policy_runtime().and_then(|policy| policy.log_dir))
.unwrap_or_else(env::temp_dir)
}
pub(super) fn runtime_proxy_log_format() -> RuntimeLogFormat {
env::var("PRODEX_RUNTIME_LOG_FORMAT")
.ok()
.and_then(|value| RuntimeLogFormat::parse(&value))
.or_else(|| runtime_policy_runtime().and_then(|policy| policy.log_format))
.unwrap_or(RuntimeLogFormat::Text)
}
pub(super) fn create_runtime_proxy_log_path() -> PathBuf {
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let sequence = RUNTIME_PROXY_LOG_SEQUENCE.fetch_add(1, Ordering::Relaxed);
let dir = runtime_proxy_log_dir();
let _ = fs::create_dir_all(&dir);
dir.join(format!(
"{RUNTIME_PROXY_LOG_FILE_PREFIX}-{}-{millis}-{sequence}.log",
std::process::id()
))
}
pub(super) fn runtime_proxy_latest_log_pointer_path() -> PathBuf {
runtime_proxy_log_dir().join(RUNTIME_PROXY_LATEST_LOG_POINTER)
}
pub(super) fn initialize_runtime_proxy_log_path() -> PathBuf {
cleanup_runtime_proxy_log_housekeeping();
let log_path = create_runtime_proxy_log_path();
let _ = fs::write(
runtime_proxy_latest_log_pointer_path(),
format!("{}\n", log_path.display()),
);
runtime_proxy_log_to_path(
&log_path,
&format!(
"runtime proxy log initialized pid={} cwd={}",
std::process::id(),
std::env::current_dir()
.ok()
.map(|path| path.display().to_string())
.unwrap_or_else(|| "<unknown>".to_string())
),
);
log_path
}
pub(super) fn runtime_proxy_worker_count_default(parallelism: usize) -> usize {
parallelism.clamp(4, 12)
}
pub(super) fn runtime_proxy_worker_count() -> usize {
let parallelism = thread::available_parallelism()
.map(|count| count.get())
.unwrap_or(4);
usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_WORKER_COUNT",
runtime_policy_proxy().and_then(|policy| policy.worker_count),
runtime_proxy_worker_count_default(parallelism),
)
.clamp(1, 64)
}
pub(super) fn runtime_proxy_long_lived_worker_count_default(parallelism: usize) -> usize {
parallelism.saturating_mul(2).clamp(8, 24)
}
pub(super) fn runtime_proxy_long_lived_worker_count() -> usize {
let parallelism = thread::available_parallelism()
.map(|count| count.get())
.unwrap_or(4);
usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_LONG_LIVED_WORKER_COUNT",
runtime_policy_proxy().and_then(|policy| policy.long_lived_worker_count),
runtime_proxy_long_lived_worker_count_default(parallelism),
)
.clamp(1, 256)
}
pub(super) fn runtime_probe_refresh_worker_count() -> usize {
let parallelism = thread::available_parallelism()
.map(|count| count.get())
.unwrap_or(4);
usize_override_with_policy(
"PRODEX_RUNTIME_PROBE_REFRESH_WORKER_COUNT",
runtime_policy_proxy().and_then(|policy| policy.probe_refresh_worker_count),
parallelism.clamp(2, 4),
)
.clamp(1, 8)
}
pub(super) fn runtime_proxy_async_worker_count_default(parallelism: usize) -> usize {
parallelism.clamp(2, 4)
}
pub(super) fn runtime_proxy_async_worker_count() -> usize {
let parallelism = thread::available_parallelism()
.map(|count| count.get())
.unwrap_or(4);
usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_ASYNC_WORKER_COUNT",
runtime_policy_proxy().and_then(|policy| policy.async_worker_count),
runtime_proxy_async_worker_count_default(parallelism),
)
.clamp(2, 8)
}
pub(super) fn runtime_proxy_long_lived_queue_capacity(worker_count: usize) -> usize {
let default_capacity = worker_count.saturating_mul(8).clamp(128, 1024);
usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_LONG_LIVED_QUEUE_CAPACITY",
runtime_policy_proxy().and_then(|policy| policy.long_lived_queue_capacity),
default_capacity,
)
.max(1)
}
pub(super) fn runtime_proxy_active_request_limit_default(
worker_count: usize,
long_lived_worker_count: usize,
) -> usize {
worker_count
.saturating_add(long_lived_worker_count.saturating_mul(3))
.clamp(64, 512)
}
pub(super) fn runtime_proxy_active_request_limit(
worker_count: usize,
long_lived_worker_count: usize,
) -> usize {
usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_ACTIVE_REQUEST_LIMIT",
runtime_policy_proxy().and_then(|policy| policy.active_request_limit),
runtime_proxy_active_request_limit_default(worker_count, long_lived_worker_count),
)
.max(1)
}
fn runtime_heap_trim_last_requested_at_ms() -> &'static AtomicU64 {
static LAST_REQUESTED_AT_MS: OnceLock<AtomicU64> = OnceLock::new();
LAST_REQUESTED_AT_MS.get_or_init(|| AtomicU64::new(0))
}
#[cfg(test)]
fn runtime_heap_trim_request_count_cell() -> &'static AtomicUsize {
static REQUEST_COUNT: OnceLock<AtomicUsize> = OnceLock::new();
REQUEST_COUNT.get_or_init(|| AtomicUsize::new(0))
}
fn runtime_heap_trim_reserve(now_ms: u64) -> bool {
let cell = runtime_heap_trim_last_requested_at_ms();
loop {
let previous = cell.load(Ordering::Relaxed);
if RUNTIME_PROXY_HEAP_TRIM_MIN_INTERVAL_MS > 0
&& previous > 0
&& now_ms.saturating_sub(previous) < RUNTIME_PROXY_HEAP_TRIM_MIN_INTERVAL_MS
{
return false;
}
match cell.compare_exchange(previous, now_ms, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return true,
Err(_) => continue,
}
}
}
pub(crate) fn runtime_maybe_trim_process_heap(released_bytes: usize) -> bool {
if released_bytes < RUNTIME_PROXY_HEAP_TRIM_MIN_RELEASE_BYTES {
return false;
}
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
if !runtime_heap_trim_reserve(now_ms) {
return false;
}
#[cfg(test)]
{
runtime_heap_trim_request_count_cell().fetch_add(1, Ordering::SeqCst);
true
}
#[cfg(all(target_os = "linux", target_env = "gnu", not(test)))]
unsafe {
let _ = malloc_trim(0);
true
}
#[cfg(not(any(test, all(target_os = "linux", target_env = "gnu"))))]
{
false
}
}
#[cfg(test)]
pub(crate) fn reset_runtime_heap_trim_request_count() {
runtime_heap_trim_request_count_cell().store(0, Ordering::SeqCst);
runtime_heap_trim_last_requested_at_ms().store(0, Ordering::Relaxed);
}
#[cfg(test)]
pub(crate) fn runtime_heap_trim_request_count() -> usize {
runtime_heap_trim_request_count_cell().load(Ordering::SeqCst)
}
#[derive(Debug, Clone, Copy)]
pub(super) struct RuntimeProxyLaneLimits {
pub(super) responses: usize,
pub(super) compact: usize,
pub(super) websocket: usize,
pub(super) standard: usize,
}
#[derive(Debug, Clone)]
pub(super) struct RuntimeProxyLaneAdmission {
pub(super) responses_active: Arc<AtomicUsize>,
pub(super) compact_active: Arc<AtomicUsize>,
pub(super) websocket_active: Arc<AtomicUsize>,
pub(super) standard_active: Arc<AtomicUsize>,
pub(super) wait: Arc<(Mutex<()>, Condvar)>,
pub(super) inflight_release_revision: Arc<AtomicU64>,
pub(super) limits: RuntimeProxyLaneLimits,
}
impl RuntimeProxyLaneAdmission {
pub(super) fn new(limits: RuntimeProxyLaneLimits) -> Self {
Self {
responses_active: Arc::new(AtomicUsize::new(0)),
compact_active: Arc::new(AtomicUsize::new(0)),
websocket_active: Arc::new(AtomicUsize::new(0)),
standard_active: Arc::new(AtomicUsize::new(0)),
wait: Arc::new((Mutex::new(()), Condvar::new())),
inflight_release_revision: Arc::new(AtomicU64::new(0)),
limits,
}
}
pub(super) fn active_counter(&self, lane: RuntimeRouteKind) -> Arc<AtomicUsize> {
match lane {
RuntimeRouteKind::Responses => Arc::clone(&self.responses_active),
RuntimeRouteKind::Compact => Arc::clone(&self.compact_active),
RuntimeRouteKind::Websocket => Arc::clone(&self.websocket_active),
RuntimeRouteKind::Standard => Arc::clone(&self.standard_active),
}
}
pub(super) fn limit(&self, lane: RuntimeRouteKind) -> usize {
match lane {
RuntimeRouteKind::Responses => self.limits.responses,
RuntimeRouteKind::Compact => self.limits.compact,
RuntimeRouteKind::Websocket => self.limits.websocket,
RuntimeRouteKind::Standard => self.limits.standard,
}
}
}
pub(super) fn runtime_proxy_lane_limits(
global_limit: usize,
worker_count: usize,
long_lived_worker_count: usize,
) -> RuntimeProxyLaneLimits {
let global_limit = global_limit.max(1);
RuntimeProxyLaneLimits {
responses: usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_RESPONSES_ACTIVE_LIMIT",
runtime_policy_proxy().and_then(|policy| policy.responses_active_limit),
(global_limit.saturating_mul(3) / 4).clamp(4, global_limit),
)
.min(global_limit)
.max(1),
compact: usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_COMPACT_ACTIVE_LIMIT",
runtime_policy_proxy().and_then(|policy| policy.compact_active_limit),
(global_limit / 4).clamp(2, 6).min(global_limit),
)
.min(global_limit)
.max(1),
websocket: usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_WEBSOCKET_ACTIVE_LIMIT",
runtime_policy_proxy().and_then(|policy| policy.websocket_active_limit),
long_lived_worker_count.clamp(2, global_limit),
)
.min(global_limit)
.max(1),
standard: usize_override_with_policy(
"PRODEX_RUNTIME_PROXY_STANDARD_ACTIVE_LIMIT",
runtime_policy_proxy().and_then(|policy| policy.standard_active_limit),
(worker_count / 2).clamp(2, 8).min(global_limit),
)
.min(global_limit)
.max(1),
}
}
pub(super) fn runtime_proxy_log_fields(message: &str) -> BTreeMap<String, String> {
let mut fields = BTreeMap::new();
for token in message.split_whitespace() {
let Some((key, value)) = token.split_once('=') else {
continue;
};
if key.is_empty() || value.is_empty() {
continue;
}
fields.insert(key.to_string(), value.trim_matches('"').to_string());
}
fields
}
pub(super) fn runtime_proxy_log_event(message: &str) -> Option<&str> {
message
.split_whitespace()
.find(|token| !token.contains('='))
.filter(|token| !token.is_empty())
}
pub(super) fn runtime_proxy_log_to_path(log_path: &Path, message: &str) {
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S%.3f %:z");
let sanitized = message.replace(['\r', '\n'], " ");
let line = match runtime_proxy_log_format() {
RuntimeLogFormat::Text => format!("[{timestamp}] {sanitized}\n"),
RuntimeLogFormat::Json => {
let mut value = serde_json::Map::new();
value.insert(
"timestamp".to_string(),
serde_json::Value::String(timestamp.to_string()),
);
value.insert(
"pid".to_string(),
serde_json::Value::Number(std::process::id().into()),
);
value.insert(
"message".to_string(),
serde_json::Value::String(sanitized.clone()),
);
if let Some(event) = runtime_proxy_log_event(&sanitized) {
value.insert(
"event".to_string(),
serde_json::Value::String(event.to_string()),
);
}
let fields = runtime_proxy_log_fields(&sanitized);
if !fields.is_empty() {
value.insert(
"fields".to_string(),
serde_json::Value::Object(
fields
.into_iter()
.map(|(key, value)| (key, serde_json::Value::String(value)))
.collect(),
),
);
}
match serde_json::to_string(&serde_json::Value::Object(value)) {
Ok(serialized) => format!("{serialized}\n"),
Err(_) => format!("[{timestamp}] {sanitized}\n"),
}
}
};
if let Ok(mut file) = fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
{
let _ = file.write_all(line.as_bytes());
let _ = file.flush();
}
}
#[derive(Debug)]
pub(super) struct JsonFileLock {
pub(super) file: fs::File,
}
impl Drop for JsonFileLock {
fn drop(&mut self) {
let _ = self.file.unlock();
}
}