use super::parsing::{escape_python_string, find_matching_paren};
use pyisheval::{Interpreter, Value};
use regex::Regex;
use std::collections::HashMap;
use std::sync::OnceLock;
#[cfg(feature = "yaml")]
use saphyr::{LoadableYamlNode, Scalar, Yaml};
#[cfg(feature = "yaml")]
pub(super) fn yaml_to_python_literal(
value: Yaml,
path: &str,
yaml_tag_handler_registry: Option<&crate::eval::yaml_tag_handler::YamlTagHandlerRegistry>,
) -> Result<String, crate::error::XacroError> {
let format_f64 = |num: f64| -> String {
if num.is_nan() {
"nan".to_string()
} else if num.is_infinite() {
if num.is_sign_negative() {
"-inf".to_string()
} else {
"inf".to_string()
}
} else {
num.to_string()
}
};
match value {
Yaml::Value(scalar) => match scalar {
Scalar::Null => Ok("None".to_string()),
Scalar::Boolean(b) => {
if b {
Ok("True".to_string())
} else {
Ok("False".to_string())
}
}
Scalar::Integer(i) => Ok(i.to_string()),
Scalar::FloatingPoint(f) => Ok(format_f64(*f)),
Scalar::String(s) => {
if let Ok(num) = s.parse::<f64>() {
Ok(format_f64(num))
} else {
let escaped = escape_python_string(&s);
Ok(format!("'{}'", escaped))
}
}
},
Yaml::Sequence(seq) => {
let elements: Result<Vec<String>, _> = seq
.into_iter()
.map(|v| yaml_to_python_literal(v, path, yaml_tag_handler_registry))
.collect();
Ok(format!("[{}]", elements?.join(", ")))
}
Yaml::Mapping(map) => {
let entries: Result<Vec<String>, crate::error::XacroError> = map
.into_iter()
.map(|(k, v)| -> Result<String, crate::error::XacroError> {
let key_str = match &k {
Yaml::Value(Scalar::String(s)) => {
let escaped = escape_python_string(s);
format!("'{}'", escaped)
}
_ => yaml_to_python_literal(k, path, yaml_tag_handler_registry)?, };
let value_str = yaml_to_python_literal(v, path, yaml_tag_handler_registry)?;
Ok(format!("{}: {}", key_str, value_str))
})
.collect();
Ok(format!("{{{}}}", entries?.join(", ")))
}
Yaml::Tagged(tag, inner) => {
if matches!(&*inner, Yaml::Sequence(_) | Yaml::Mapping(_)) {
log::warn!(
"YAML tag '!{}' applied to non-scalar value (sequence/mapping) at '{}' - Python xacro does not support this. Value will be used without conversion.",
tag.suffix, path
);
return yaml_to_python_literal(*inner, path, yaml_tag_handler_registry);
}
let is_string_scalar = matches!(&*inner, Yaml::Value(Scalar::String(_)));
let raw_value = match &*inner {
Yaml::Value(Scalar::Integer(i)) => i.to_string(),
Yaml::Value(Scalar::FloatingPoint(f)) => format_f64(**f),
Yaml::Value(Scalar::String(s)) => s.to_string(),
Yaml::Value(Scalar::Null) => "None".to_string(),
Yaml::Value(Scalar::Boolean(b)) => {
if *b {
"True".to_string()
} else {
"False".to_string()
}
}
_ => yaml_to_python_literal(*inner, path, yaml_tag_handler_registry)?,
};
if let Some(registry) = yaml_tag_handler_registry {
if let Some(result) = registry.handle_tag(&tag.suffix, &raw_value) {
return Ok(result);
}
}
log::warn!(
"Unknown YAML tag '!{}' - value will be used without conversion",
tag.suffix
);
if is_string_scalar {
let escaped = escape_python_string(&raw_value);
Ok(format!("'{}'", escaped))
} else {
Ok(raw_value)
}
}
Yaml::Representation(repr, _style, _tag) => {
yaml_to_python_literal(Yaml::value_from_str(&repr), path, yaml_tag_handler_registry)
}
Yaml::Alias(_) => Err(crate::error::XacroError::YamlParseError {
path: path.to_string(),
message: "YAML aliases are not supported".to_string(),
}),
Yaml::BadValue => Err(crate::error::XacroError::YamlParseError {
path: path.to_string(),
message: "YAML contains an invalid value".to_string(),
}),
}
}
#[cfg(feature = "yaml")]
pub(super) fn load_yaml_file(
path: &str,
yaml_tag_handler_registry: Option<&crate::eval::yaml_tag_handler::YamlTagHandlerRegistry>,
) -> Result<String, crate::error::XacroError> {
let contents = std::fs::read_to_string(path).map_err(|source| {
crate::error::XacroError::YamlLoadError {
path: path.to_string(),
source,
}
})?;
let docs =
Yaml::load_from_str(&contents).map_err(|e| crate::error::XacroError::YamlParseError {
path: path.to_string(),
message: e.to_string(),
})?;
let yaml_value =
docs.into_iter()
.next()
.ok_or_else(|| crate::error::XacroError::YamlParseError {
path: path.to_string(),
message: "YAML file contains no documents".to_string(),
})?;
yaml_to_python_literal(yaml_value, path, yaml_tag_handler_registry)
}
#[cfg(feature = "yaml")]
static LOAD_YAML_REGEX: OnceLock<Regex> = OnceLock::new();
#[cfg(feature = "yaml")]
pub(super) fn get_load_yaml_regex() -> &'static Regex {
LOAD_YAML_REGEX.get_or_init(|| {
Regex::new(r"\b(?:xacro\.)?load_yaml\s*\(").expect("load_yaml regex should be valid")
})
}
#[cfg(feature = "yaml")]
pub(super) fn preprocess_load_yaml(
expr: &str,
interp: &mut Interpreter,
context: &HashMap<String, Value>,
yaml_tag_handler_registry: Option<&crate::eval::yaml_tag_handler::YamlTagHandlerRegistry>,
) -> Result<String, super::EvalError> {
let regex = get_load_yaml_regex();
let mut result = expr.to_string();
let mut iteration = 0;
const MAX_ITERATIONS: usize = 100;
loop {
iteration += 1;
if iteration > MAX_ITERATIONS {
return Err(super::EvalError::PyishEval {
expr: expr.to_string(),
source: pyisheval::EvalError::ParseError(
"Too many nested load_yaml() calls (possible infinite loop)".to_string(),
),
});
}
let captures: Vec<_> = regex.captures_iter(&result).collect();
if captures.is_empty() {
break;
}
let mut made_replacement = false;
for caps in captures.iter().rev() {
let whole_match = match caps.get(0) {
Some(m) => m,
None => continue,
};
let paren_pos = whole_match.end() - 1;
let close_pos = match find_matching_paren(&result, paren_pos) {
Some(pos) => pos,
None => continue, };
let filename_arg = &result[paren_pos + 1..close_pos];
let filename = if filename_arg.len() >= 2
&& ((filename_arg.starts_with('\'') && filename_arg.ends_with('\''))
|| (filename_arg.starts_with('"') && filename_arg.ends_with('"')))
{
filename_arg[1..filename_arg.len() - 1].to_string()
} else {
match interp.eval_with_context(filename_arg, context) {
Ok(Value::StringLit(s)) => s,
Ok(other) => {
return Err(super::EvalError::PyishEval {
expr: format!("load_yaml({})", filename_arg),
source: pyisheval::EvalError::ParseError(format!(
"load_yaml() filename must be a string, got: {:?}",
other
)),
});
}
Err(e) => {
return Err(super::EvalError::PyishEval {
expr: format!("load_yaml({})", filename_arg),
source: e,
});
}
}
};
let python_literal =
load_yaml_file(&filename, yaml_tag_handler_registry).map_err(|e| {
super::EvalError::PyishEval {
expr: format!("load_yaml('{}')", filename),
source: pyisheval::EvalError::ParseError(e.to_string()),
}
})?;
result.replace_range(whole_match.start()..=close_pos, &python_literal);
made_replacement = true;
break; }
if !made_replacement {
break;
}
}
Ok(result)
}
#[cfg(not(feature = "yaml"))]
pub(super) fn preprocess_load_yaml(
expr: &str,
_interp: &mut Interpreter,
_context: &HashMap<String, Value>,
) -> Result<String, super::EvalError> {
static LOAD_YAML_REGEX: OnceLock<Regex> = OnceLock::new();
let regex = LOAD_YAML_REGEX.get_or_init(|| {
Regex::new(r"\b(?:xacro\.)?load_yaml\s*\(").expect("load_yaml regex should be valid")
});
if regex.is_match(expr) {
return Err(super::EvalError::PyishEval {
expr: expr.to_string(),
source: pyisheval::EvalError::ParseError(
crate::error::XacroError::YamlFeatureDisabled.to_string(),
),
});
}
Ok(expr.to_string())
}