use super::*;
pub(crate) fn runtime_broker_key(upstream_base_url: &str, include_code_review: bool) -> String {
let mut hasher = DefaultHasher::new();
upstream_base_url.hash(&mut hasher);
include_code_review.hash(&mut hasher);
RUNTIME_PROXY_OPENAI_MOUNT_PATH.hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
pub(crate) fn runtime_current_prodex_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
pub(crate) fn runtime_process_pid_alive(pid: u32) -> bool {
let proc_dir = PathBuf::from(format!("/proc/{pid}"));
if proc_dir.exists() {
return true;
}
collect_process_rows().into_iter().any(|row| row.pid == pid)
}
pub(crate) fn runtime_random_token(prefix: &str) -> String {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let sequence = STATE_SAVE_SEQUENCE.fetch_add(1, Ordering::Relaxed);
format!("{prefix}-{}-{nanos:x}-{sequence:x}", std::process::id())
}
pub(crate) fn runtime_broker_startup_grace_seconds() -> i64 {
let ready_timeout_seconds = runtime_broker_ready_timeout_ms().div_ceil(1_000) as i64;
ready_timeout_seconds
.saturating_add(1)
.max(RUNTIME_BROKER_IDLE_GRACE_SECONDS)
}
pub(crate) fn load_runtime_broker_registry(
paths: &AppPaths,
broker_key: &str,
) -> Result<Option<RuntimeBrokerRegistry>> {
let path = runtime_broker_registry_file_path(paths, broker_key);
let backup_path = runtime_broker_registry_last_good_file_path(paths, broker_key);
if !path.exists() && !backup_path.exists() {
return Ok(None);
}
match load_json_file_with_backup::<RuntimeBrokerRegistry>(&path, &backup_path) {
Ok(loaded) => Ok(Some(loaded.value)),
Err(_err) if !path.exists() && !backup_path.exists() => Ok(None),
Err(err) => Err(err),
}
}
pub(crate) fn save_runtime_broker_registry(
paths: &AppPaths,
broker_key: &str,
registry: &RuntimeBrokerRegistry,
) -> Result<()> {
let path = runtime_broker_registry_file_path(paths, broker_key);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
}
let json = serde_json::to_string_pretty(registry)
.context("failed to serialize runtime broker registry")?;
write_json_file_with_backup(
&path,
&runtime_broker_registry_last_good_file_path(paths, broker_key),
&json,
|content| {
let _: RuntimeBrokerRegistry = serde_json::from_str(content)
.context("failed to validate runtime broker registry")?;
Ok(())
},
)
}
pub(crate) fn remove_runtime_broker_registry_if_token_matches(
paths: &AppPaths,
broker_key: &str,
instance_token: &str,
) {
let Ok(Some(existing)) = load_runtime_broker_registry(paths, broker_key) else {
return;
};
if existing.instance_token != instance_token {
return;
}
for path in [
runtime_broker_registry_file_path(paths, broker_key),
runtime_broker_registry_last_good_file_path(paths, broker_key),
] {
let _ = fs::remove_file(path);
}
}
pub(crate) fn legacy_runtime_proxy_openai_mount_path(version: &str) -> String {
format!("{LEGACY_RUNTIME_PROXY_OPENAI_MOUNT_PATH_PREFIX}{version}")
}
pub(crate) fn parse_prodex_version_output(output: &str) -> Option<String> {
let mut parts = output.split_whitespace();
let binary_name = parts.next()?;
let version = parts.next()?;
if binary_name == "prodex" && !version.is_empty() {
return Some(version.to_string());
}
None
}
pub(crate) fn read_prodex_version_from_executable(executable: &Path) -> Result<String> {
let output = Command::new(executable)
.arg("--version")
.stdin(Stdio::null())
.stderr(Stdio::null())
.output()
.with_context(|| format!("failed to run {} --version", executable.display()))?;
if !output.status.success() {
bail!(
"{} --version exited with status {}",
executable.display(),
output
.status
.code()
.map(|code| code.to_string())
.unwrap_or_else(|| "signal".to_string())
);
}
let stdout = String::from_utf8_lossy(&output.stdout);
parse_prodex_version_output(&stdout).with_context(|| {
format!(
"failed to parse prodex version output from {}",
executable.display()
)
})
}
pub(crate) fn runtime_process_executable_path(pid: u32) -> Option<PathBuf> {
fs::read_link(format!("/proc/{pid}/exe")).ok().or_else(|| {
collect_process_rows()
.into_iter()
.find(|row| row.pid == pid)
.and_then(|row| row.args.into_iter().rfind(|arg| Path::new(arg).exists()))
.map(PathBuf::from)
})
}
pub(crate) fn runtime_process_prodex_version(pid: u32) -> Option<String> {
let mut candidates = Vec::new();
if let Some(executable) = runtime_process_executable_path(pid) {
candidates.push(executable);
}
if let Some(row) = collect_process_rows()
.into_iter()
.find(|row| row.pid == pid)
{
for arg in row.args {
let path = PathBuf::from(&arg);
if path.exists() && !candidates.iter().any(|candidate| candidate == &path) {
candidates.push(path);
}
}
}
candidates
.into_iter()
.find_map(|executable| read_prodex_version_from_executable(&executable).ok())
}
pub(crate) fn runtime_broker_matches_current_prodex(registry: &RuntimeBrokerRegistry) -> bool {
runtime_process_pid_alive(registry.pid)
&& runtime_process_prodex_version(registry.pid).as_deref()
== Some(runtime_current_prodex_version())
}
pub(crate) fn terminate_runtime_process(pid: u32) {
if !runtime_process_pid_alive(pid) {
return;
}
let pid_value = pid.to_string();
let signal_process = |signal: &str| {
let _ = Command::new("kill")
.args([signal, pid_value.as_str()])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
};
let wait_for_exit = |timeout_ms: u64| -> bool {
let started_at = Instant::now();
while started_at.elapsed() < Duration::from_millis(timeout_ms) {
if !runtime_process_pid_alive(pid) {
return true;
}
thread::sleep(Duration::from_millis(20));
}
!runtime_process_pid_alive(pid)
};
signal_process("-TERM");
if wait_for_exit(500) {
return;
}
signal_process("-KILL");
let _ = wait_for_exit(250);
}
pub(crate) fn replace_runtime_broker_if_version_mismatch(
paths: &AppPaths,
broker_key: &str,
registry: &RuntimeBrokerRegistry,
) -> bool {
if !runtime_process_pid_alive(registry.pid) || runtime_broker_matches_current_prodex(registry) {
return false;
}
terminate_runtime_process(registry.pid);
remove_runtime_broker_registry_if_token_matches(paths, broker_key, ®istry.instance_token);
true
}
pub(crate) fn runtime_broker_openai_mount_path(registry: &RuntimeBrokerRegistry) -> Result<String> {
if let Some(openai_mount_path) = registry.openai_mount_path.as_deref() {
return Ok(openai_mount_path.to_string());
}
let version = runtime_process_prodex_version(registry.pid).with_context(|| {
format!(
"failed to resolve prodex version for runtime broker pid {}",
registry.pid
)
})?;
Ok(legacy_runtime_proxy_openai_mount_path(&version))
}
pub(crate) fn create_runtime_broker_lease(
paths: &AppPaths,
broker_key: &str,
) -> Result<RuntimeBrokerLease> {
let lease_dir = runtime_broker_lease_dir(paths, broker_key);
create_runtime_broker_lease_in_dir_for_pid(&lease_dir, std::process::id())
}
pub(crate) fn create_runtime_broker_lease_in_dir_for_pid(
lease_dir: &Path,
pid: u32,
) -> Result<RuntimeBrokerLease> {
fs::create_dir_all(lease_dir)
.with_context(|| format!("failed to create {}", lease_dir.display()))?;
let path = lease_dir.join(format!("{}-{}.lease", pid, runtime_random_token("lease")));
fs::write(&path, format!("pid={pid}\n"))
.with_context(|| format!("failed to write {}", path.display()))?;
Ok(RuntimeBrokerLease { path })
}
pub(crate) fn cleanup_runtime_broker_stale_leases(paths: &AppPaths, broker_key: &str) -> usize {
let lease_dir = runtime_broker_lease_dir(paths, broker_key);
let Ok(entries) = fs::read_dir(&lease_dir) else {
return 0;
};
let mut live = 0usize;
for entry in entries.flatten() {
let path = entry.path();
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
let pid = file_name
.split('-')
.next()
.and_then(|value| value.parse::<u32>().ok());
if pid.is_some_and(runtime_process_pid_alive) {
live += 1;
} else {
let _ = fs::remove_file(path);
}
}
live
}
pub(crate) fn runtime_proxy_endpoint_from_registry(
paths: &AppPaths,
broker_key: &str,
registry: &RuntimeBrokerRegistry,
) -> Result<RuntimeProxyEndpoint> {
let lease = create_runtime_broker_lease(paths, broker_key)?;
let lease_dir = runtime_broker_lease_dir(paths, broker_key);
let listen_addr = registry.listen_addr.parse().with_context(|| {
format!(
"invalid runtime broker listen address {}",
registry.listen_addr
)
})?;
Ok(RuntimeProxyEndpoint {
listen_addr,
openai_mount_path: runtime_broker_openai_mount_path(registry)?,
lease_dir,
_lease: Some(lease),
})
}