use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList};
use lerna::config::interpolation::{resolve, ResolverContext};
use lerna::config::value::{ConfigDict, ConfigValue};
use lerna::config::{
CachingConfigRepository, ConfigRepository as RustConfigRepository,
SearchPathElement as RustSearchPathElement,
};
use lerna::config::{ConfigLoader as RustConfigLoader, SearchPathEntry as RustSearchPathEntry};
use lerna::ObjectType;
fn config_value_to_py(py: Python, value: &ConfigValue) -> PyResult<Py<PyAny>> {
match value {
ConfigValue::Null => Ok(py.None()),
ConfigValue::Bool(b) => Ok((*b).into_pyobject(py)?.to_owned().into_any().unbind()),
ConfigValue::Int(i) => Ok((*i).into_pyobject(py)?.to_owned().into_any().unbind()),
ConfigValue::Float(f) => Ok((*f).into_pyobject(py)?.to_owned().into_any().unbind()),
ConfigValue::String(s) => Ok(s.as_str().into_pyobject(py)?.to_owned().into_any().unbind()),
ConfigValue::Interpolation(s) => {
Ok(s.as_str().into_pyobject(py)?.to_owned().into_any().unbind())
}
ConfigValue::Missing => {
Ok("???".into_pyobject(py)?.to_owned().into_any().unbind())
}
ConfigValue::List(items) => {
let list = PyList::empty(py);
for item in items {
list.append(config_value_to_py(py, item)?)?;
}
Ok(list.into_any().unbind())
}
ConfigValue::Dict(dict) => config_dict_to_py(py, dict),
}
}
fn config_dict_to_py(py: Python, dict: &ConfigDict) -> PyResult<Py<PyAny>> {
let py_dict = PyDict::new(py);
for (key, value) in dict.iter() {
py_dict.set_item(key, config_value_to_py(py, value)?)?;
}
Ok(py_dict.into_any().unbind())
}
fn py_to_config_value(py: Python, obj: &Bound<'_, PyAny>) -> PyResult<ConfigValue> {
if obj.is_none() {
Ok(ConfigValue::Null)
} else if let Ok(b) = obj.extract::<bool>() {
Ok(ConfigValue::Bool(b))
} else if let Ok(i) = obj.extract::<i64>() {
Ok(ConfigValue::Int(i))
} else if let Ok(f) = obj.extract::<f64>() {
Ok(ConfigValue::Float(f))
} else if let Ok(s) = obj.extract::<String>() {
if s == "???" {
Ok(ConfigValue::Missing)
} else {
Ok(ConfigValue::String(s))
}
} else if let Ok(list) = obj.cast::<PyList>() {
let mut items = Vec::new();
for item in list.iter() {
items.push(py_to_config_value(py, &item)?);
}
Ok(ConfigValue::List(items))
} else if let Ok(dict) = obj.cast::<PyDict>() {
let mut config_dict = ConfigDict::new();
for (key, value) in dict.iter() {
if let Ok(k) = key.extract::<String>() {
config_dict.insert(k, py_to_config_value(py, &value)?);
}
}
Ok(ConfigValue::Dict(config_dict))
} else {
Ok(ConfigValue::String(obj.str()?.to_string()))
}
}
#[pyclass(name = "SearchPathEntry")]
#[derive(Clone)]
pub struct PySearchPathEntry {
pub provider: String,
pub path: String,
}
#[pymethods]
impl PySearchPathEntry {
#[new]
fn new(provider: String, path: String) -> Self {
Self { provider, path }
}
#[getter]
fn provider(&self) -> &str {
&self.provider
}
#[getter]
fn path(&self) -> &str {
&self.path
}
fn __repr__(&self) -> String {
format!(
"SearchPathEntry(provider='{}', path='{}')",
self.provider, self.path
)
}
}
impl From<&PySearchPathEntry> for RustSearchPathEntry {
fn from(entry: &PySearchPathEntry) -> Self {
RustSearchPathEntry::new(&entry.provider, &entry.path)
}
}
#[pyclass(name = "ConfigLoader")]
pub struct PyConfigLoader {
loader: RustConfigLoader,
}
#[pymethods]
impl PyConfigLoader {
#[new]
#[pyo3(signature = (search_paths=None, config_dir=None))]
fn new(
search_paths: Option<Vec<PySearchPathEntry>>,
config_dir: Option<String>,
) -> PyResult<Self> {
let loader = if let Some(dir) = config_dir {
RustConfigLoader::from_config_dir(&dir)
} else if let Some(paths) = search_paths {
let rust_paths: Vec<RustSearchPathEntry> = paths.iter().map(|p| p.into()).collect();
RustConfigLoader::new(rust_paths)
} else {
return Err(PyRuntimeError::new_err(
"Either search_paths or config_dir must be provided",
));
};
Ok(Self { loader })
}
#[staticmethod]
fn from_config_dir(config_dir: &str) -> Self {
Self {
loader: RustConfigLoader::from_config_dir(config_dir),
}
}
#[pyo3(signature = (config_name=None, overrides=None))]
fn load_config(
&self,
py: Python,
config_name: Option<&str>,
overrides: Option<Vec<String>>,
) -> PyResult<Py<PyAny>> {
let overrides_ref: Vec<String> = overrides.unwrap_or_default();
let config = self
.loader
.load_config(config_name, &overrides_ref)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
config_value_to_py(py, &config)
}
fn config_exists(&self, config_path: &str) -> bool {
self.loader.config_exists(config_path)
}
fn group_exists(&self, group_path: &str) -> bool {
self.loader.group_exists(group_path)
}
fn list_group(&self, group_path: &str) -> Vec<String> {
self.loader.list_group(group_path)
}
fn list_groups(&self, parent_path: &str) -> Vec<String> {
self.loader.list_groups(parent_path)
}
fn __repr__(&self) -> String {
format!("ConfigLoader(sources={})", self.loader.sources().len())
}
}
#[pyfunction]
fn parse_yaml(py: Python, content: &str) -> PyResult<Py<PyAny>> {
let config =
lerna::config::parse_yaml(content).map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
config_value_to_py(py, &config)
}
#[pyfunction]
fn load_yaml_file(py: Python, path: &str) -> PyResult<Py<PyAny>> {
let config = lerna::config::load_yaml_file(std::path::Path::new(path))
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
config_value_to_py(py, &config)
}
#[pyfunction]
fn resolve_interpolations(py: Python, config: Bound<'_, PyAny>) -> PyResult<Py<PyAny>> {
let config_value = py_to_config_value(py, &config)?;
let dict = match &config_value {
ConfigValue::Dict(d) => d.clone(),
_ => return Err(PyRuntimeError::new_err("Config must be a dictionary")),
};
let ctx = ResolverContext::new(&dict);
let resolved =
resolve(&config_value, &ctx).map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
config_value_to_py(py, &resolved)
}
#[pyfunction]
#[pyo3(signature = (config_dir, config_name=None, overrides=None))]
fn compose_config(
py: Python,
config_dir: &str,
config_name: Option<&str>,
overrides: Option<Vec<String>>,
) -> PyResult<Py<PyAny>> {
let loader = RustConfigLoader::from_config_dir(config_dir);
let overrides_ref: Vec<String> = overrides.unwrap_or_default();
let config = loader
.load_config(config_name, &overrides_ref)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
let dict = match &config {
ConfigValue::Dict(d) => d.clone(),
_ => return Err(PyRuntimeError::new_err("Config must be a dictionary")),
};
let ctx = ResolverContext::new(&dict);
let resolved = resolve(&config, &ctx).map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
config_value_to_py(py, &resolved)
}
#[pyfunction]
fn extract_header_dict(py: Python, content: &str) -> PyResult<Py<PyAny>> {
let header = lerna::config::extract_header(content);
let py_dict = PyDict::new(py);
for (key, value) in header {
py_dict.set_item(key, value)?;
}
if !py_dict.contains("package")? {
py_dict.set_item("package", py.None())?;
}
Ok(py_dict.into_any().unbind())
}
#[pyclass(name = "RustConfigRepository")]
pub struct PyConfigRepository {
inner: RustConfigRepository,
}
#[pymethods]
impl PyConfigRepository {
#[new]
fn new(search_paths: Vec<(String, String)>) -> Self {
let elements: Vec<RustSearchPathElement> = search_paths
.iter()
.map(|(provider, path)| RustSearchPathElement::new(provider, path))
.collect();
Self {
inner: RustConfigRepository::new(&elements),
}
}
fn load_config(&self, py: Python, config_path: &str) -> PyResult<Option<Py<PyAny>>> {
match self.inner.load_config(config_path) {
Ok(Some(result)) => config_value_to_py(py, &result.config).map(Some),
Ok(None) => Ok(None),
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
}
}
fn load_config_full(&self, py: Python, config_path: &str) -> PyResult<Option<Py<PyAny>>> {
match self.inner.load_config(config_path) {
Ok(Some(result)) => {
let dict = PyDict::new(py);
dict.set_item("provider", &result.provider)?;
dict.set_item("path", &result.path)?;
dict.set_item("config", config_value_to_py(py, &result.config)?)?;
dict.set_item("is_schema_source", result.is_schema_source)?;
let header_dict = PyDict::new(py);
for (k, v) in &result.header {
header_dict.set_item(k, v)?;
}
dict.set_item("header", header_dict)?;
Ok(Some(dict.into_any().unbind()))
}
Ok(None) => Ok(None),
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
}
}
fn group_exists(&self, config_path: &str) -> bool {
self.inner.group_exists(config_path)
}
fn config_exists(&self, config_path: &str) -> bool {
self.inner.config_exists(config_path)
}
#[pyo3(signature = (group_name, results_filter=None))]
fn get_group_options(&self, group_name: &str, results_filter: Option<&str>) -> Vec<String> {
let filter = match results_filter {
Some("config") => Some(ObjectType::Config),
Some("group") => Some(ObjectType::Group),
_ => Some(ObjectType::Config), };
self.inner.get_group_options(group_name, filter)
}
fn num_sources(&self) -> usize {
self.inner.get_sources().len()
}
fn __repr__(&self) -> String {
format!("RustConfigRepository(sources={})", self.num_sources())
}
}
#[pyclass(name = "RustCachingConfigRepository")]
pub struct PyCachingConfigRepository {
inner: CachingConfigRepository,
}
#[pymethods]
impl PyCachingConfigRepository {
#[new]
fn new(search_paths: Vec<(String, String)>) -> Self {
let elements: Vec<RustSearchPathElement> = search_paths
.iter()
.map(|(provider, path)| RustSearchPathElement::new(provider, path))
.collect();
let base_repo = RustConfigRepository::new(&elements);
Self {
inner: CachingConfigRepository::new(base_repo),
}
}
fn load_config(&mut self, py: Python, config_path: &str) -> PyResult<Option<Py<PyAny>>> {
match self.inner.load_config(config_path) {
Ok(Some(result)) => config_value_to_py(py, &result.config).map(Some),
Ok(None) => Ok(None),
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
}
}
fn group_exists(&self, config_path: &str) -> bool {
self.inner.group_exists(config_path)
}
fn config_exists(&self, config_path: &str) -> bool {
self.inner.config_exists(config_path)
}
#[pyo3(signature = (group_name, results_filter=None))]
fn get_group_options(&self, group_name: &str, results_filter: Option<&str>) -> Vec<String> {
let filter = match results_filter {
Some("config") => Some(ObjectType::Config),
Some("group") => Some(ObjectType::Group),
_ => Some(ObjectType::Config),
};
self.inner.get_group_options(group_name, filter)
}
fn clear_cache(&mut self) {
self.inner.clear_cache();
}
#[pyo3(signature = (config_name=None, overrides=None))]
fn load_and_compose(
&mut self,
py: Python,
config_name: Option<&str>,
overrides: Option<Vec<String>>,
) -> PyResult<Py<PyAny>> {
let overrides_ref: Vec<String> = overrides.unwrap_or_default();
let result = self
.inner
.load_and_compose(config_name, &overrides_ref)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
let dict = PyDict::new(py);
dict.set_item("config", config_dict_to_py(py, &result.config)?)?;
let defaults_list = PyList::empty(py);
for rd in &result.defaults_result.defaults {
let rd_dict = PyDict::new(py);
rd_dict.set_item("config_path", rd.config_path.as_deref())?;
rd_dict.set_item("parent", rd.parent.as_deref())?;
rd_dict.set_item("package", rd.package.as_deref())?;
rd_dict.set_item("is_self", rd.is_self)?;
rd_dict.set_item("primary", rd.primary)?;
rd_dict.set_item("override_key", rd.override_key.as_deref())?;
defaults_list.append(rd_dict)?;
}
dict.set_item("defaults", defaults_list)?;
let config_ovrs = PyList::empty(py);
for ovr in &result.defaults_result.config_overrides {
config_ovrs.append(ovr)?;
}
dict.set_item("config_overrides", config_ovrs)?;
Ok(dict.into_any().unbind())
}
fn __repr__(&self) -> String {
format!("RustCachingConfigRepository()")
}
}
pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PySearchPathEntry>()?;
m.add_class::<PyConfigLoader>()?;
m.add_class::<PyConfigRepository>()?;
m.add_class::<PyCachingConfigRepository>()?;
m.add_class::<PyHybridConfigRepository>()?;
m.add_function(wrap_pyfunction!(parse_yaml, m)?)?;
m.add_function(wrap_pyfunction!(load_yaml_file, m)?)?;
m.add_function(wrap_pyfunction!(resolve_interpolations, m)?)?;
m.add_function(wrap_pyfunction!(compose_config, m)?)?;
m.add_function(wrap_pyfunction!(extract_header_dict, m)?)?;
Ok(())
}
#[pyclass(name = "RustHybridConfigRepository")]
pub struct PyHybridConfigRepository {
file_sources: Vec<(String, String)>, pkg_sources: Vec<(String, String)>, structured_sources: Vec<String>, pkg_loader: Option<Py<PyAny>>,
pkg_config_exists: Option<Py<PyAny>>,
pkg_group_exists: Option<Py<PyAny>>,
pkg_list_options: Option<Py<PyAny>>,
structured_loader: Option<Py<PyAny>>,
structured_config_exists: Option<Py<PyAny>>,
structured_group_exists: Option<Py<PyAny>>,
structured_list_options: Option<Py<PyAny>>,
rust_repo: Option<RustConfigRepository>,
cache: std::collections::HashMap<String, Option<ConfigValue>>,
}
#[pymethods]
impl PyHybridConfigRepository {
#[new]
#[pyo3(signature = (search_paths, pkg_loader=None, pkg_config_exists=None, pkg_group_exists=None, pkg_list_options=None, structured_loader=None, structured_config_exists=None, structured_group_exists=None, structured_list_options=None))]
fn new(
search_paths: Vec<(String, String)>,
pkg_loader: Option<Py<PyAny>>,
pkg_config_exists: Option<Py<PyAny>>,
pkg_group_exists: Option<Py<PyAny>>,
pkg_list_options: Option<Py<PyAny>>,
structured_loader: Option<Py<PyAny>>,
structured_config_exists: Option<Py<PyAny>>,
structured_group_exists: Option<Py<PyAny>>,
structured_list_options: Option<Py<PyAny>>,
) -> Self {
let mut file_sources = Vec::new();
let mut pkg_sources = Vec::new();
let mut structured_sources = Vec::new();
for (provider, path) in &search_paths {
if path.starts_with("pkg://") {
let module_path = path.strip_prefix("pkg://").unwrap_or(path);
pkg_sources.push((provider.clone(), module_path.to_string()));
} else if path.starts_with("structured://") {
structured_sources.push(provider.clone());
} else if path.starts_with("file://") {
file_sources.push((provider.clone(), path.clone()));
} else {
file_sources.push((provider.clone(), path.clone()));
}
}
let rust_repo = if !file_sources.is_empty() {
let elements: Vec<RustSearchPathElement> = file_sources
.iter()
.map(|(p, path)| RustSearchPathElement::new(p, path))
.collect();
Some(RustConfigRepository::new(&elements))
} else {
None
};
Self {
file_sources,
pkg_sources,
structured_sources,
pkg_loader,
pkg_config_exists,
pkg_group_exists,
pkg_list_options,
structured_loader,
structured_config_exists,
structured_group_exists,
structured_list_options,
rust_repo,
cache: std::collections::HashMap::new(),
}
}
fn load_config(&mut self, py: Python, config_path: &str) -> PyResult<Option<Py<PyAny>>> {
let cache_key = format!("config:{}", config_path);
if let Some(cached) = self.cache.get(&cache_key) {
return match cached {
Some(v) => config_value_to_py(py, v).map(Some),
None => Ok(None),
};
}
if let Some(ref rust_repo) = self.rust_repo {
if let Ok(Some(result)) = rust_repo.load_config(config_path) {
self.cache.insert(cache_key, Some(result.config.clone()));
return config_value_to_py(py, &result.config).map(Some);
}
}
if let Some(ref loader) = self.pkg_loader {
for (_provider, module_path) in &self.pkg_sources {
let result = loader.call1(py, (module_path.as_str(), config_path))?;
if !result.is_none(py) {
let config_value = py_to_config_value(py, result.bind(py))?;
self.cache.insert(cache_key, Some(config_value.clone()));
return config_value_to_py(py, &config_value).map(Some);
}
}
}
if !self.structured_sources.is_empty() {
if let Some(ref loader) = self.structured_loader {
let result = loader.call1(py, (config_path,))?;
if !result.is_none(py) {
let config_value = py_to_config_value(py, result.bind(py))?;
self.cache.insert(cache_key, Some(config_value.clone()));
return config_value_to_py(py, &config_value).map(Some);
}
}
}
self.cache.insert(cache_key, None);
Ok(None)
}
fn config_exists(&self, py: Python, config_path: &str) -> PyResult<bool> {
if let Some(ref rust_repo) = self.rust_repo {
if rust_repo.config_exists(config_path) {
return Ok(true);
}
}
if let Some(ref exists_fn) = self.pkg_config_exists {
for (_provider, module_path) in &self.pkg_sources {
let result = exists_fn.call1(py, (module_path.as_str(), config_path))?;
if result.extract::<bool>(py)? {
return Ok(true);
}
}
}
if !self.structured_sources.is_empty() {
if let Some(ref exists_fn) = self.structured_config_exists {
let result = exists_fn.call1(py, (config_path,))?;
if result.extract::<bool>(py)? {
return Ok(true);
}
}
}
Ok(false)
}
fn group_exists(&self, py: Python, config_path: &str) -> PyResult<bool> {
if let Some(ref rust_repo) = self.rust_repo {
if rust_repo.group_exists(config_path) {
return Ok(true);
}
}
if let Some(ref exists_fn) = self.pkg_group_exists {
for (_provider, module_path) in &self.pkg_sources {
let result = exists_fn.call1(py, (module_path.as_str(), config_path))?;
if result.extract::<bool>(py)? {
return Ok(true);
}
}
}
if !self.structured_sources.is_empty() {
if let Some(ref exists_fn) = self.structured_group_exists {
let result = exists_fn.call1(py, (config_path,))?;
if result.extract::<bool>(py)? {
return Ok(true);
}
}
}
Ok(false)
}
#[pyo3(signature = (group_name, results_filter=None))]
fn get_group_options(
&self,
py: Python,
group_name: &str,
results_filter: Option<&str>,
) -> PyResult<Vec<String>> {
let mut options = Vec::new();
if let Some(ref rust_repo) = self.rust_repo {
let filter = match results_filter {
Some("config") => Some(ObjectType::Config),
Some("group") => Some(ObjectType::Group),
_ => Some(ObjectType::Config),
};
options.extend(rust_repo.get_group_options(group_name, filter));
}
if let Some(ref list_fn) = self.pkg_list_options {
for (_provider, module_path) in &self.pkg_sources {
let result = list_fn.call1(py, (module_path.as_str(), group_name))?;
let items: Vec<String> = result.extract(py)?;
options.extend(items);
}
}
if !self.structured_sources.is_empty() {
if let Some(ref list_fn) = self.structured_list_options {
let result = list_fn.call1(py, (group_name,))?;
let items: Vec<String> = result.extract(py)?;
options.extend(items);
}
}
options.sort();
options.dedup();
Ok(options)
}
fn clear_cache(&mut self) {
self.cache.clear();
}
fn __repr__(&self) -> String {
format!(
"RustHybridConfigRepository(file_sources={}, pkg_sources={}, structured_sources={})",
self.file_sources.len(),
self.pkg_sources.len(),
self.structured_sources.len()
)
}
}