use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lru::LruCache;
use parking_lot::Mutex as FastMutex;
use tokio::sync::{Mutex, Notify};
use uuid::Uuid;
use crate::types::{ConnectionInfo, ProcessInfo};
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "macos")]
mod macos;
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
mod socket_pid;
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
mod unsupported;
#[cfg(target_os = "windows")]
mod windows;
#[cfg(target_os = "linux")]
#[allow(unused_imports)]
pub(crate) use linux::PlatformProcessAttributor;
#[cfg(target_os = "macos")]
#[allow(unused_imports)]
pub(crate) use macos::PlatformProcessAttributor;
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
#[allow(unused_imports)]
pub(crate) use unsupported::PlatformProcessAttributor;
#[cfg(target_os = "windows")]
#[allow(unused_imports)]
pub(crate) use windows::PlatformProcessAttributor;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct ProcessIdentity {
pub(crate) pid: u32,
pub(crate) start_token: String,
}
pub(crate) trait ProcessAttributor: Send + Sync + 'static {
fn lookup<'a>(
&'a self,
connection: &'a ConnectionInfo,
) -> Pin<Box<dyn Future<Output = Option<ProcessInfo>> + Send + 'a>>;
fn lookup_identity<'a>(
&'a self,
_connection: &'a ConnectionInfo,
) -> Pin<Box<dyn Future<Output = Option<ProcessIdentity>> + Send + 'a>> {
Box::pin(async { None })
}
fn lookup_by_identity<'a>(
&'a self,
_identity: &'a ProcessIdentity,
) -> Pin<Box<dyn Future<Output = Option<ProcessInfo>> + Send + 'a>> {
Box::pin(async { None })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ProcessCachePath {
ConnectionHit,
IdentityHit,
Miss,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CachedLookupResult {
process_info: Option<ProcessInfo>,
timed_out: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ProcessLookupResult {
pub(crate) process_info: Option<ProcessInfo>,
pub(crate) timed_out: bool,
pub(crate) cache_path: ProcessCachePath,
pub(crate) pid_reuse_detected: bool,
pub(crate) cache_evictions: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct UncachedLookupOutcome {
process_info: Option<ProcessInfo>,
cache_path: ProcessCachePath,
pid_reuse_detected: bool,
cache_evictions: u32,
}
#[derive(Debug, Clone)]
struct CacheEntry<V> {
value: V,
cached_at: Instant,
}
impl<V> CacheEntry<V> {
fn new(value: V) -> Self {
Self {
value,
cached_at: Instant::now(),
}
}
}
#[derive(Debug)]
pub(crate) struct ProcessLookupService<A: ProcessAttributor> {
attributor: Arc<A>,
timeout: Duration,
cache_ttl: Option<Duration>,
connection_cache: Mutex<LruCache<Uuid, CacheEntry<CachedLookupResult>>>,
identity_cache: Mutex<LruCache<ProcessIdentity, CacheEntry<ProcessInfo>>>,
pid_start_tokens: Mutex<LruCache<u32, CacheEntry<String>>>,
in_flight: Arc<FastMutex<HashMap<Uuid, Arc<Notify>>>>,
}
#[derive(Debug)]
struct InFlightLeaderGuard {
in_flight: Arc<FastMutex<HashMap<Uuid, Arc<Notify>>>>,
connection_id: Uuid,
notify: Arc<Notify>,
}
impl InFlightLeaderGuard {
fn new(
in_flight: Arc<FastMutex<HashMap<Uuid, Arc<Notify>>>>,
connection_id: Uuid,
notify: Arc<Notify>,
) -> Self {
Self {
in_flight,
connection_id,
notify,
}
}
}
impl Drop for InFlightLeaderGuard {
fn drop(&mut self) {
let mut in_flight = self.in_flight.lock();
if let Some(existing) = in_flight.get(&self.connection_id) {
if Arc::ptr_eq(existing, &self.notify) {
in_flight.remove(&self.connection_id);
}
}
drop(in_flight);
self.notify.notify_waiters();
}
}
impl<A: ProcessAttributor> ProcessLookupService<A> {
pub(crate) fn new_with_cache(
attributor: Arc<A>,
timeout: Duration,
cache_capacity: usize,
cache_ttl: Option<Duration>,
) -> Self {
let capacity = NonZeroUsize::new(cache_capacity.max(1))
.expect("process cache capacity must be non-zero");
Self {
attributor,
timeout,
cache_ttl,
connection_cache: Mutex::new(LruCache::new(capacity)),
identity_cache: Mutex::new(LruCache::new(capacity)),
pid_start_tokens: Mutex::new(LruCache::new(capacity)),
in_flight: Arc::new(FastMutex::new(HashMap::new())),
}
}
pub(crate) async fn resolve_with_status(
&self,
connection: &ConnectionInfo,
) -> ProcessLookupResult {
let connection_id = connection.connection_id;
if let Some(cached) = self.cached_connection_result(connection_id).await {
return Self::result_from_cached_connection(cached);
}
loop {
let (leader, notify) = {
let mut in_flight = self.in_flight.lock();
if let Some(existing) = in_flight.get(&connection_id) {
(false, Arc::clone(existing))
} else {
let notify = Arc::new(Notify::new());
in_flight.insert(connection_id, Arc::clone(¬ify));
(true, notify)
}
};
if leader {
let _leader_guard = InFlightLeaderGuard::new(
Arc::clone(&self.in_flight),
connection_id,
Arc::clone(¬ify),
);
return self.resolve_miss_and_cache(connection_id, connection).await;
}
notify.notified().await;
if let Some(cached) = self.cached_connection_result(connection_id).await {
return Self::result_from_cached_connection(cached);
}
}
}
async fn cached_connection_result(&self, connection_id: Uuid) -> Option<CachedLookupResult> {
let mut cache = self.connection_cache.lock().await;
cache_get_if_fresh(&mut cache, &connection_id, self.cache_ttl).map(|entry| entry.value)
}
fn result_from_cached_connection(cached: CachedLookupResult) -> ProcessLookupResult {
ProcessLookupResult {
process_info: cached.process_info,
timed_out: cached.timed_out,
cache_path: ProcessCachePath::ConnectionHit,
pid_reuse_detected: false,
cache_evictions: 0,
}
}
async fn resolve_miss_and_cache(
&self,
connection_id: Uuid,
connection: &ConnectionInfo,
) -> ProcessLookupResult {
let outcome =
match tokio::time::timeout(self.timeout, self.resolve_uncached(connection)).await {
Ok(outcome) => outcome,
Err(_) => {
let evictions = self
.cache_connection_result(
connection_id,
CachedLookupResult {
process_info: None,
timed_out: true,
},
)
.await;
return ProcessLookupResult {
process_info: None,
timed_out: true,
cache_path: ProcessCachePath::Miss,
pid_reuse_detected: false,
cache_evictions: evictions,
};
}
};
let connection_evictions = self
.cache_connection_result(
connection_id,
CachedLookupResult {
process_info: outcome.process_info.clone(),
timed_out: false,
},
)
.await;
ProcessLookupResult {
process_info: outcome.process_info,
timed_out: false,
cache_path: outcome.cache_path,
pid_reuse_detected: outcome.pid_reuse_detected,
cache_evictions: outcome.cache_evictions + connection_evictions,
}
}
async fn resolve_uncached(&self, connection: &ConnectionInfo) -> UncachedLookupOutcome {
let mut cache_evictions = 0;
let mut pid_reuse_detected = false;
if let Some(identity) = self.attributor.lookup_identity(connection).await {
let (reused, pid_token_evictions) = self.register_pid_start_token(&identity).await;
pid_reuse_detected = reused;
cache_evictions += pid_token_evictions;
let identity_cached = {
let mut cache = self.identity_cache.lock().await;
cache_get_if_fresh(&mut cache, &identity, self.cache_ttl).map(|entry| entry.value)
};
if let Some(process_info) = identity_cached {
return UncachedLookupOutcome {
process_info: Some(process_info),
cache_path: ProcessCachePath::IdentityHit,
pid_reuse_detected,
cache_evictions,
};
}
let mut resolved = self.attributor.lookup_by_identity(&identity).await;
if resolved.is_none() {
resolved = self.attributor.lookup(connection).await;
}
if let Some(process_info) = resolved.as_ref() {
cache_evictions += self
.cache_identity_result(identity, process_info.clone())
.await;
}
return UncachedLookupOutcome {
process_info: resolved,
cache_path: ProcessCachePath::Miss,
pid_reuse_detected,
cache_evictions,
};
}
UncachedLookupOutcome {
process_info: self.attributor.lookup(connection).await,
cache_path: ProcessCachePath::Miss,
pid_reuse_detected,
cache_evictions,
}
}
async fn cache_connection_result(
&self,
connection_id: Uuid,
result: CachedLookupResult,
) -> u32 {
cache_push_count_eviction(
&self.connection_cache,
connection_id,
CacheEntry::new(result),
)
.await
}
async fn cache_identity_result(
&self,
identity: ProcessIdentity,
process_info: ProcessInfo,
) -> u32 {
cache_push_count_eviction(
&self.identity_cache,
identity,
CacheEntry::new(process_info),
)
.await
}
async fn register_pid_start_token(&self, identity: &ProcessIdentity) -> (bool, u32) {
let mut cache = self.pid_start_tokens.lock().await;
let pid_reuse_detected = cache_get_if_fresh(&mut cache, &identity.pid, self.cache_ttl)
.map(|entry| entry.value != identity.start_token)
.unwrap_or(false);
let evicted = if cache
.push(identity.pid, CacheEntry::new(identity.start_token.clone()))
.is_some()
{
1
} else {
0
};
(pid_reuse_detected, evicted)
}
pub(crate) async fn remove_connection(&self, connection_id: Uuid) {
let mut cache = self.connection_cache.lock().await;
let _ = cache.pop(&connection_id);
}
}
async fn cache_push_count_eviction<K, V>(cache: &Mutex<LruCache<K, V>>, key: K, value: V) -> u32
where
K: Eq + Hash,
{
let mut cache = cache.lock().await;
if cache.push(key, value).is_some() {
1
} else {
0
}
}
fn cache_get_if_fresh<K, V>(
cache: &mut LruCache<K, CacheEntry<V>>,
key: &K,
cache_ttl: Option<Duration>,
) -> Option<CacheEntry<V>>
where
K: Eq + Hash,
V: Clone,
{
let entry = cache.get(key).cloned()?;
if cache_ttl
.map(|ttl| entry.cached_at.elapsed() >= ttl)
.unwrap_or(false)
{
let _ = cache.pop(key);
None
} else {
Some(entry)
}
}
const MAX_PARENT_DEPTH: usize = 5;
const RUNTIME_BINARIES: &[&str] = &[
"node", "nodejs", "deno", "bun", "ts-node", "python", "python3", "python2", "ruby", "perl",
"php",
];
fn is_runtime_binary(name: &str) -> bool {
RUNTIME_BINARIES.contains(&name)
}
pub(super) fn derive_identity_walking_parents(
pid: u32,
exe_filename: Option<&str>,
read_args: &dyn Fn(u32) -> Option<Vec<String>>,
read_parent_pid: &dyn Fn(u32) -> Option<u32>,
) -> Option<String> {
if !exe_filename.map(is_runtime_binary).unwrap_or(false) {
return None;
}
let own_args = read_args(pid);
if let Some(ref args) = own_args {
if let Some(pkg) = find_package_in_args(args) {
return Some(pkg);
}
}
let mut current = pid;
for _ in 0..MAX_PARENT_DEPTH {
let parent = match read_parent_pid(current) {
Some(ppid) if ppid > 1 => ppid,
_ => break,
};
if let Some(args) = read_args(parent) {
if let Some(pkg) = find_package_in_args(&args) {
return Some(pkg);
}
}
current = parent;
}
if let Some(ref args) = own_args {
if let Some(name) = find_script_name_in_args(args) {
return Some(name);
}
}
None
}
fn find_package_in_args(args: &[String]) -> Option<String> {
for arg in args.iter().skip(1) {
if arg.starts_with('-') {
continue;
}
if let Some(pkg) = extract_node_package_name(arg) {
return Some(pkg);
}
break;
}
None
}
fn find_script_name_in_args(args: &[String]) -> Option<String> {
for arg in args.iter().skip(1) {
if arg.starts_with('-') {
continue;
}
let path = std::path::Path::new(arg.as_str());
let looks_like_path = path.extension().is_some() || arg.contains('/') || arg.contains('\\');
if !looks_like_path {
break;
}
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
if !stem.is_empty() {
return Some(stem.to_string());
}
}
break;
}
None
}
fn extract_node_package_name(script_path: &str) -> Option<String> {
let normalized = script_path.replace('\\', "/");
let parts: Vec<&str> = normalized.split('/').collect();
let nm_idx = parts.iter().position(|&p| p == "node_modules")?;
let next = nm_idx + 1;
if next >= parts.len() {
return None;
}
if parts[next].starts_with('@') {
let scope_idx = next + 1;
if scope_idx < parts.len() && !parts[scope_idx].is_empty() {
return Some(format!("{}/{}", parts[next], parts[scope_idx]));
}
return None;
}
let name = parts[next];
if name.is_empty() {
None
} else {
Some(name.to_string())
}
}
#[cfg(test)]
mod identity_tests {
use super::*;
#[test]
fn extracts_scoped_package_from_node_modules_path() {
let path = concat!("/usr/local/lib/node_modules/@open", "ai/codex/bin/codex.js");
assert_eq!(
extract_node_package_name(path),
Some(concat!("@open", "ai/codex").to_string())
);
}
#[test]
fn extracts_unscoped_package_from_node_modules_path() {
let path = "/project/node_modules/express/lib/express.js";
assert_eq!(extract_node_package_name(path), Some("express".to_string()));
}
#[test]
fn returns_none_when_no_node_modules_in_path() {
assert_eq!(extract_node_package_name("/usr/bin/curl"), None);
}
#[test]
fn identifies_node_as_runtime() {
assert!(is_runtime_binary("node"));
assert!(is_runtime_binary("python3"));
assert!(!is_runtime_binary("curl"));
assert!(!is_runtime_binary("Google Chrome"));
}
#[test]
fn find_package_skips_flags() {
let args = vec![
"node".into(),
"--inspect".into(),
concat!("/x/node_modules/@anthro", "pic-ai/clau", "de-code/cli.js").into(),
];
assert_eq!(
find_package_in_args(&args),
Some(concat!("@anthro", "pic-ai/clau", "de-code").to_string())
);
}
#[test]
fn find_script_name_extracts_basename() {
let args = vec!["node".into(), "/home/user/worker.js".into()];
assert_eq!(find_script_name_in_args(&args), Some("worker".to_string()));
}
#[test]
fn find_script_name_skips_non_path_args() {
let args = vec!["node".into(), "eval-code".into()];
assert_eq!(find_script_name_in_args(&args), None);
}
#[test]
fn derive_identity_returns_none_for_non_runtime() {
let result = derive_identity_walking_parents(100, Some("curl"), &|_| None, &|_| None);
assert_eq!(result, None);
}
#[test]
fn derive_identity_extracts_package_from_current_process() {
let result = derive_identity_walking_parents(
100,
Some("node"),
&|pid| {
if pid == 100 {
Some(vec![
"node".into(),
concat!("/lib/node_modules/@open", "ai/codex/bin/codex.js").into(),
])
} else {
None
}
},
&|_| Some(1),
);
assert_eq!(result, Some(concat!("@open", "ai/codex").to_string()));
}
#[test]
fn derive_identity_walks_parents_for_package() {
let result = derive_identity_walking_parents(
200,
Some("node"),
&|pid| match pid {
200 => Some(vec!["node".into(), "worker.js".into()]),
150 => Some(vec![
"node".into(),
concat!("/lib/node_modules/@open", "ai/codex/bin/codex.js").into(),
]),
_ => None,
},
&|pid| match pid {
200 => Some(150),
150 => Some(1),
_ => None,
},
);
assert_eq!(result, Some(concat!("@open", "ai/codex").to_string()));
}
#[test]
fn derive_identity_falls_back_to_script_name() {
let result = derive_identity_walking_parents(
300,
Some("node"),
&|pid| {
if pid == 300 {
Some(vec!["node".into(), "/app/server.js".into()])
} else {
None
}
},
&|_| Some(1),
);
assert_eq!(result, Some("server".to_string()));
}
}
#[cfg(test)]
mod tests;