use core::sync::atomic::AtomicU64;
use core::sync::atomic::Ordering;
use std::collections::HashSet;
use std::ffi::CString;
use pyo3::Bound;
use pyo3::Python;
use pyo3::types::PyAny;
use pyo3::types::PyAnyMethods;
use pyo3::types::PyDict;
use pyo3::types::PyDictMethods;
use pyo3::types::PyModule;
use polyplug::error::LoaderError;
const ISOLATION_HELPER_PY: &str = r#"
import os
import sys
def isolate(prefix, bundle_dir, before):
bundle_dir = os.path.realpath(bundle_dir)
before = set(before)
to_move = []
for name in list(sys.modules.keys()):
if name in before:
continue
module = sys.modules.get(name)
if module is None:
continue
file = getattr(module, "__file__", None)
try:
search_paths = list(getattr(module, "__path__", []) or [])
except Exception:
# A namespace package whose parent was already moved can raise while
# recalculating its path; treat it as in-bundle so it is re-keyed too.
search_paths = [bundle_dir]
under_bundle = False
if file is not None and os.path.realpath(file).startswith(bundle_dir):
under_bundle = True
else:
for search_path in search_paths:
if os.path.realpath(str(search_path)).startswith(bundle_dir):
under_bundle = True
break
if under_bundle:
to_move.append(name)
# Re-key children before parents so namespace-package path recalculation
# never observes a half-moved tree.
to_move.sort(key=len, reverse=True)
moved = []
for name in to_move:
module = sys.modules[name]
sys.modules[prefix + "." + name] = module
del sys.modules[name]
moved.append(name)
return moved
"#;
pub(crate) fn snapshot_loaded_modules(
py: Python<'_>,
bundle_name: &str,
) -> Result<HashSet<String>, LoaderError> {
let sys_mod: Bound<'_, PyModule> =
PyModule::import(py, "sys").map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("Python sys import failed: {}", e),
})?;
let modules: Bound<'_, PyAny> =
sys_mod
.getattr("modules")
.map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("sys.modules access failed: {}", e),
})?;
let keys_view: Bound<'_, PyAny> =
modules
.call_method0("keys")
.map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("sys.modules.keys() failed: {}", e),
})?;
let keys: Bound<'_, PyAny> = py
.get_type::<pyo3::types::PyList>()
.call1((keys_view,))
.map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("sys.modules keys materialization failed: {}", e),
})?;
let names: Vec<String> = keys
.extract()
.map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("sys.modules keys extraction failed: {}", e),
})?;
Ok(names.into_iter().collect())
}
static ISOLATION_NONCE: AtomicU64 = AtomicU64::new(0);
fn next_isolation_prefix(bundle_id: u64) -> String {
let nonce: u64 = ISOLATION_NONCE.fetch_add(1, Ordering::Relaxed);
format!("__polyplug_bundle_{:016X}_{:016X}__", bundle_id, nonce)
}
pub(crate) fn isolate_bundle_modules(
py: Python<'_>,
bundle_name: &str,
bundle_id: u64,
bundle_dir: &str,
before: &HashSet<String>,
) -> Result<String, LoaderError> {
let helper_code: CString =
CString::new(ISOLATION_HELPER_PY).map_err(|e: std::ffi::NulError| {
LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("isolation helper contained interior nul: {}", e),
}
})?;
let file_name: CString =
CString::new("polyplug_python_isolation.py").map_err(|e: std::ffi::NulError| {
LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("isolation file name contained interior nul: {}", e),
}
})?;
let module_name: CString =
CString::new("polyplug_python_isolation").map_err(|e: std::ffi::NulError| {
LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("isolation module name contained interior nul: {}", e),
}
})?;
let helper: Bound<'_, PyModule> =
PyModule::from_code(py, &helper_code, &file_name, &module_name).map_err(
|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("isolation helper compile failed: {}", e),
},
)?;
let prefix: String = next_isolation_prefix(bundle_id);
let before_list: Vec<&str> = before.iter().map(|s: &String| s.as_str()).collect();
helper
.getattr("isolate")
.and_then(|isolate: Bound<'_, PyAny>| {
isolate.call1((prefix.as_str(), bundle_dir, before_list))
})
.map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("module isolation failed: {}", e),
})?;
let sys_mod: Bound<'_, PyModule> =
PyModule::import(py, "sys").map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("Python sys import failed: {}", e),
})?;
let modules: Bound<'_, PyAny> =
sys_mod
.getattr("modules")
.map_err(|e: pyo3::PyErr| LoaderError::InitFailed {
bundle: bundle_name.to_owned(),
error: format!("sys.modules access failed: {}", e),
})?;
if let Ok(dict) = modules.cast::<PyDict>() {
let _ = dict.del_item("polyplug_python_isolation");
}
Ok(prefix)
}
#[cfg(test)]
mod tests {
use super::next_isolation_prefix;
use std::collections::HashSet;
#[test]
fn same_bundle_id_yields_distinct_prefixes() {
let bundle_id: u64 = 0xDEAD_BEEF_CAFE_0001;
let first: String = next_isolation_prefix(bundle_id);
let second: String = next_isolation_prefix(bundle_id);
assert_ne!(
first, second,
"identical bundle_id must still produce unique prefixes via the nonce"
);
}
#[test]
fn prefixes_are_pairwise_unique() {
let mut seen: HashSet<String> = HashSet::new();
let ids: [u64; 4] = [1, 1, 2, 1];
for &id in ids.iter() {
for _ in 0..16 {
let prefix: String = next_isolation_prefix(id);
assert!(
seen.insert(prefix.clone()),
"prefix collision detected: {}",
prefix
);
}
}
}
#[test]
fn prefix_embeds_bundle_id_and_has_expected_shape() {
let bundle_id: u64 = 0x0123_4567_89AB_CDEF;
let prefix: String = next_isolation_prefix(bundle_id);
assert!(prefix.starts_with("__polyplug_bundle_0123456789ABCDEF_"));
assert!(prefix.ends_with("__"));
}
}