#[cfg(not(target_arch = "wasm32"))]
use std::fmt;
#[cfg(any(test, not(target_arch = "wasm32")))]
use std::sync::Mutex;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::OnceLock;
#[cfg(any(test, not(target_arch = "wasm32")))]
use crate::error::FlowError;
use crate::error::Result;
#[cfg(not(target_arch = "wasm32"))]
const BINDING_KIND_ENV: &str = "NEMO_FLOW_BINDING_KIND";
#[cfg(not(target_arch = "wasm32"))]
const OWNER_TOKEN_ENV: &str = "NEMO_FLOW_RUNTIME_OWNER";
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Clone, PartialEq, Eq)]
struct RuntimeOwner {
pid: u32,
binding_kind: String,
major_version: String,
}
#[cfg(not(target_arch = "wasm32"))]
impl RuntimeOwner {
fn current(binding_kind: String) -> Result<Self> {
Ok(Self {
pid: std::process::id(),
binding_kind,
major_version: current_compatibility_version()?.to_string(),
})
}
fn parse(token: &str) -> Result<Self> {
let mut pid = None;
let mut binding_kind = None;
let mut version = None;
for field in token.split(';') {
if let Some(value) = field.strip_prefix("pid=") {
pid = Some(value.parse::<u32>().map_err(|e| {
FlowError::Internal(
format!("invalid NeMo Flow owner token pid {value:?}: {e}",),
)
})?);
} else if let Some(value) = field.strip_prefix("binding=") {
if value.is_empty() {
return Err(FlowError::Internal(
"invalid NeMo Flow owner token: binding kind is empty".into(),
));
}
binding_kind = Some(value.to_string());
} else if let Some(value) = field.strip_prefix("version=") {
version = Some(compatibility_major_version(value)?.to_string());
}
}
Ok(Self {
pid: pid.ok_or_else(|| {
FlowError::Internal("invalid NeMo Flow owner token: missing pid".into())
})?,
binding_kind: binding_kind.ok_or_else(|| {
FlowError::Internal("invalid NeMo Flow owner token: missing binding".into())
})?,
major_version: version.ok_or_else(|| {
FlowError::Internal("invalid NeMo Flow owner token: missing version".into())
})?,
})
}
fn token(&self) -> String {
format!(
"pid={};binding={};version={}",
self.pid, self.binding_kind, self.major_version
)
}
fn same_owner(&self, other: &Self) -> bool {
self.pid == other.pid
&& self.binding_kind == other.binding_kind
&& self.major_version == other.major_version
}
}
#[cfg(not(target_arch = "wasm32"))]
impl fmt::Display for RuntimeOwner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}@{} pid={}",
self.binding_kind, self.major_version, self.pid
)
}
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Default)]
struct RuntimeOwnerController {
binding_kind: Option<String>,
}
#[cfg(not(target_arch = "wasm32"))]
static RUNTIME_OWNER_CONTROLLER: OnceLock<Mutex<RuntimeOwnerController>> = OnceLock::new();
#[cfg(not(target_arch = "wasm32"))]
fn runtime_owner_controller() -> &'static Mutex<RuntimeOwnerController> {
RUNTIME_OWNER_CONTROLLER.get_or_init(|| Mutex::new(RuntimeOwnerController::default()))
}
#[cfg(any(test, not(target_arch = "wasm32")))]
fn compatibility_major_version(version: &str) -> Result<&str> {
version
.split('.')
.next()
.filter(|value| !value.is_empty() && value.chars().all(|c| c.is_ascii_digit()))
.ok_or_else(|| {
FlowError::Internal(format!(
"invalid NeMo Flow version {version:?}: expected a semver-compatible major",
))
})
}
#[cfg(any(test, not(target_arch = "wasm32")))]
fn current_compatibility_version() -> Result<&'static str> {
compatibility_major_version(env!("CARGO_PKG_VERSION"))
}
#[cfg(not(target_arch = "wasm32"))]
fn resolve_binding_kind(binding_kind: Option<String>) -> String {
binding_kind
.or_else(|| std::env::var(BINDING_KIND_ENV).ok())
.unwrap_or_else(|| "rust".to_string())
}
#[cfg(not(target_arch = "wasm32"))]
fn read_process_runtime_owner() -> Result<Option<RuntimeOwner>> {
let Some(token) = std::env::var(OWNER_TOKEN_ENV)
.ok()
.filter(|value| !value.is_empty())
else {
return Ok(None);
};
match RuntimeOwner::parse(&token) {
Ok(owner) => Ok(Some(owner)),
Err(_) => {
clear_process_runtime_owner();
Ok(None)
}
}
}
#[cfg(not(target_arch = "wasm32"))]
fn publish_process_runtime_owner(owner: &RuntimeOwner) {
unsafe { std::env::set_var(OWNER_TOKEN_ENV, owner.token()) };
}
#[cfg(not(target_arch = "wasm32"))]
fn clear_process_runtime_owner() {
unsafe { std::env::remove_var(OWNER_TOKEN_ENV) };
}
#[cfg(not(target_arch = "wasm32"))]
#[doc(hidden)]
pub fn initialize_shared_runtime_binding(binding_kind: &str) -> Result<()> {
let previous_binding_kind = {
let controller = runtime_owner_controller();
let mut guard = controller.lock().map_err(|e| {
FlowError::Internal(format!("runtime owner controller lock poisoned: {e}"))
})?;
if let Some(existing) = guard.binding_kind.as_deref()
&& existing != binding_kind
{
return Err(FlowError::InvalidArgument(format!(
"NeMo Flow binding identity is already initialized as {existing}; attempted={binding_kind}",
)));
}
let previous = guard.binding_kind.clone();
guard
.binding_kind
.get_or_insert_with(|| binding_kind.to_string());
previous
};
if let Err(error) = ensure_process_runtime_owner() {
if previous_binding_kind.is_none() {
let controller = runtime_owner_controller();
let mut guard = controller.lock().map_err(|e| {
FlowError::Internal(format!("runtime owner controller lock poisoned: {e}"))
})?;
if guard.binding_kind.as_deref() == Some(binding_kind) {
guard.binding_kind = None;
}
}
return Err(error);
}
Ok(())
}
#[cfg(target_arch = "wasm32")]
#[doc(hidden)]
pub fn initialize_shared_runtime_binding(_binding_kind: &str) -> Result<()> {
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn ensure_process_runtime_owner() -> Result<()> {
let binding_kind = {
let controller = runtime_owner_controller();
let guard = controller.lock().map_err(|e| {
FlowError::Internal(format!("runtime owner controller lock poisoned: {e}"))
})?;
resolve_binding_kind(guard.binding_kind.clone())
};
let current = RuntimeOwner::current(binding_kind)?;
match read_process_runtime_owner()? {
Some(existing) if existing.same_owner(¤t) => Ok(()),
Some(existing) if existing.pid != current.pid => {
publish_process_runtime_owner(¤t);
Ok(())
}
Some(existing) => Err(FlowError::InvalidArgument(format!(
"NeMo Flow does not support multiple bindings in one process; existing owner={} attempted={}",
existing, current
))),
None => {
publish_process_runtime_owner(¤t);
Ok(())
}
}
}
#[cfg(target_arch = "wasm32")]
pub(crate) fn ensure_process_runtime_owner() -> Result<()> {
Ok(())
}
#[cfg(test)]
static TEST_MUTEX: Mutex<()> = Mutex::new(());
#[cfg(test)]
pub(crate) fn runtime_owner_test_mutex() -> &'static Mutex<()> {
&TEST_MUTEX
}
#[cfg(test)]
pub(crate) fn reset_runtime_owner_for_tests() {
#[cfg(target_arch = "wasm32")]
{
return;
}
#[cfg(not(target_arch = "wasm32"))]
{
clear_process_runtime_owner();
let controller = runtime_owner_controller();
let mut guard = controller.lock().unwrap();
guard.binding_kind = None;
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "../tests/coverage/shared_runtime_tests.rs"]
mod tests;