pub fn gen_pyo3_error_types(error: &ErrorDef, module_name: &str, seen_exceptions: &mut AHashSet<String>) -> String {
let mut variant_names = Vec::new();
for variant in &error.variants {
let variant_name = python_exception_name(&variant.name, &error.name);
if seen_exceptions.insert(variant_name.clone()) {
variant_names.push(variant_name);
}
}
let include_base = seen_exceptions.insert(error.name.clone());
crate::codegen::template_env::render(
"error_gen/pyo3_error_types.jinja",
minijinja::context! {
variant_names => variant_names,
module_name => module_name,
error_name => error.name.as_str(),
include_base => include_base,
},
)
}
pub fn gen_pyo3_error_converter(error: &ErrorDef, core_import: &str) -> String {
let rust_path = if error.rust_path.is_empty() {
format!("{core_import}::{}", error.name)
} else {
let normalized = error.rust_path.replace('-', "_");
let segments: Vec<&str> = normalized.split("::").collect();
if segments.len() > 2 {
let crate_name = segments[0];
let error_name = segments[segments.len() - 1];
format!("{crate_name}::{error_name}")
} else {
normalized
}
};
let fn_name = format!("{}_to_py_err", to_snake_case(&error.name));
let has_methods = !error.methods.is_empty();
let mut variants = Vec::new();
for variant in &error.variants {
let pattern = error_variant_wildcard_pattern(&rust_path, variant);
let variant_exc_name = python_exception_name(&variant.name, &error.name);
variants.push((pattern, variant_exc_name));
}
crate::codegen::template_env::render(
"error_gen/pyo3_error_converter.jinja",
minijinja::context! {
rust_path => rust_path.as_str(),
fn_name => fn_name.as_str(),
error_name => error.name.as_str(),
variants => variants,
has_methods => has_methods,
},
)
}
pub fn gen_pyo3_error_registration(error: &ErrorDef, seen_registrations: &mut AHashSet<String>) -> Vec<String> {
let mut registrations = Vec::with_capacity(error.variants.len() + 1);
for variant in &error.variants {
let variant_exc_name = python_exception_name(&variant.name, &error.name);
if seen_registrations.insert(variant_exc_name.clone()) {
registrations.push(format!(
" m.add(\"{}\", m.py().get_type::<{}>())?;",
variant_exc_name, variant_exc_name
));
}
}
if seen_registrations.insert(error.name.clone()) {
registrations.push(format!(
" m.add(\"{}\", m.py().get_type::<{}>())?;",
error.name, error.name
));
}
registrations
}
pub fn converter_fn_name(error: &ErrorDef) -> String {
format!("{}_to_py_err", to_snake_case(&error.name))
}
pub fn gen_pyo3_error_methods_impl(error: &ErrorDef) -> String {
if error.methods.is_empty() {
return String::new();
}
let struct_name = format!("{}Info", error.name);
let snake_name = to_snake_case(&error.name);
let fn_name = format!("{snake_name}_info");
let mut fields = Vec::new();
let mut getters = Vec::new();
let has_status_code = error.methods.iter().any(|m| m.name == "status_code");
let has_is_transient = error.methods.iter().any(|m| m.name == "is_transient");
let has_error_type = error.methods.iter().any(|m| m.name == "error_type");
if has_status_code {
fields.push(" pub status_code: u16,".to_string());
getters.push(
concat!(
" /// HTTP status code for this error (0 means no associated status).\n",
" #[getter]\n",
" fn status_code(&self) -> u16 {\n",
" self.status_code\n",
" }",
)
.to_string(),
);
}
if has_is_transient {
fields.push(" pub is_transient: bool,".to_string());
getters.push(
concat!(
" /// Returns `true` if the error is transient and a retry may succeed.\n",
" #[getter]\n",
" fn is_transient(&self) -> bool {\n",
" self.is_transient\n",
" }",
)
.to_string(),
);
}
if has_error_type {
fields.push(" pub error_type: String,".to_string());
getters.push(
concat!(
" /// Machine-readable error category string for matching and logging.\n",
" #[getter]\n",
" fn error_type(&self) -> String {\n",
" self.error_type.clone()\n",
" }",
)
.to_string(),
);
}
for method in &error.methods {
match method.name.as_str() {
"status_code" | "is_transient" | "error_type" => {}
other => getters.push(format!(
" // Not emitted: getter for method `{other}` on `{struct_name}`"
)),
}
}
let mut ctor_fields = Vec::new();
if has_status_code {
ctor_fields.push(
" status_code: args\n\
\x20 .as_ref()\n\
\x20 .and_then(|a| a.get_item(1).ok())\n\
\x20 .and_then(|v| v.extract::<u16>().ok())\n\
\x20 .unwrap_or(0),",
);
}
if has_is_transient {
ctor_fields.push(
" is_transient: args\n\
\x20 .as_ref()\n\
\x20 .and_then(|a| a.get_item(2).ok())\n\
\x20 .and_then(|v| v.extract::<bool>().ok())\n\
\x20 .unwrap_or(false),",
);
}
if has_error_type {
ctor_fields.push(
" error_type: args\n\
\x20 .as_ref()\n\
\x20 .and_then(|a| a.get_item(3).ok())\n\
\x20 .and_then(|v| v.extract::<String>().ok())\n\
\x20 .unwrap_or_default(),",
);
}
let struct_def = format!(
"#[pyclass(name = \"{struct_name}\")]\npub struct {struct_name} {{\n{}\n}}",
fields.join("\n")
);
let impl_block = format!("#[pymethods]\nimpl {struct_name} {{\n{}\n}}", getters.join("\n\n"));
let free_fn = format!(
"/// Build a `{struct_name}` from any exception raised by the `{error_name}` hierarchy.\n\
///\n\
/// The converter stores `(message, status_code, is_transient, error_type)` in the\n\
/// exception args tuple; this function extracts those values at indices 1–3.\n\
#[pyfunction]\n\
pub fn {fn_name}(err: pyo3::Bound<'_, pyo3::types::PyAny>) -> {struct_name} {{\n\
let args = err.getattr(\"args\").ok();\n\
{struct_name} {{\n\
{ctor}\n\
}}\n\
}}",
error_name = error.name,
ctor = ctor_fields.join("\n"),
);
format!("{struct_def}\n\n{impl_block}\n\n{free_fn}")
}
pub fn pyo3_error_has_methods(error: &ErrorDef) -> bool {
!error.methods.is_empty()
}
pub fn pyo3_error_info_struct_name(error: &ErrorDef) -> String {
format!("{}Info", error.name)
}
pub fn pyo3_error_info_fn_name(error: &ErrorDef) -> String {
format!("{}_info", to_snake_case(&error.name))
}
use crate::core::ir::ErrorDef;
use ahash::AHashSet;
use super::shared::{error_variant_wildcard_pattern, python_exception_name, to_snake_case};