use log::debug;
use once_cell::sync::Lazy;
use pyo3::prelude::*;
use pyo3::types::PyModule;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
static COMMAND_COUNTER: AtomicU64 = AtomicU64::new(0);
pub fn register(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
debug!("Registering Angreal types to Python module");
m.add_class::<AngrealCommand>()?;
m.add_class::<AngrealArg>()?;
m.add_class::<AngrealGroup>()?;
m.add_class::<ToolDescription>()?;
debug!("Successfully registered all Angreal types");
Ok(())
}
pub static ANGREAL_TASKS: Lazy<Mutex<HashMap<String, AngrealCommand>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub static ANGREAL_ARGS: Lazy<Mutex<HashMap<String, Vec<AngrealArg>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub static ANGREAL_GROUPS: Lazy<Mutex<Vec<AngrealGroup>>> = Lazy::new(|| Mutex::new(vec![]));
thread_local! {
static LAST_COMMAND_PATH: RefCell<Option<String>> = const { RefCell::new(None) };
}
pub fn set_current_command_path(path: String) {
LAST_COMMAND_PATH.with(|p| *p.borrow_mut() = Some(path));
}
pub fn get_current_command_path() -> Option<String> {
LAST_COMMAND_PATH.with(|p| p.borrow().clone())
}
pub fn generate_command_path_key(command: &AngrealCommand) -> String {
generate_command_path_key_from_parts(command.group.as_deref(), &command.name)
}
pub fn generate_command_path_key_from_parts(groups: Option<&[AngrealGroup]>, name: &str) -> String {
let path = match groups {
None | Some([]) => name.to_string(),
Some(groups) => {
let group_path = groups
.iter()
.map(|g| g.name.clone())
.collect::<Vec<_>>()
.join(".");
format!("{}.{}", group_path, name)
}
};
path.strip_prefix('.').unwrap_or(&path).to_string()
}
pub fn generate_path_key_from_parts(groups: &[String], command_name: &str) -> String {
let path = if groups.is_empty() {
command_name.to_string()
} else {
format!("{}.{}", groups.join("."), command_name)
};
path.strip_prefix('.').unwrap_or(&path).to_string()
}
#[derive(Clone, Debug)]
#[pyclass(name = "Group")]
pub struct AngrealGroup {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub about: Option<String>,
}
#[derive(Clone, Debug)]
#[pyclass(name = "ToolDescription")]
pub struct ToolDescription {
#[pyo3(get)]
pub description: String,
#[pyo3(get)]
pub risk_level: String,
}
#[pymethods]
impl ToolDescription {
#[new]
#[pyo3(signature = (description, *, risk_level = None))]
fn __new__(description: &str, risk_level: Option<&str>) -> Self {
let risk = risk_level.unwrap_or("safe");
let validated_risk = match risk {
"safe" | "read_only" | "destructive" => risk.to_string(),
_ => {
log::warn!(
"Invalid risk_level '{}', defaulting to 'safe'. Valid values: safe, read_only, destructive",
risk
);
"safe".to_string()
}
};
ToolDescription {
description: description.to_string(),
risk_level: validated_risk,
}
}
fn __repr__(&self) -> String {
format!(
"ToolDescription(description=<{} chars>, risk_level='{}')",
self.description.len(),
self.risk_level
)
}
}
#[pymethods]
impl AngrealGroup {
#[new]
#[pyo3(signature = (name, about=None))]
fn __new__(name: &str, about: Option<&str>) -> Self {
let group = AngrealGroup {
name: name.to_string(),
about: about.map(|i| i.to_string()),
};
let mut groups = ANGREAL_GROUPS.lock().unwrap();
if !groups.iter().any(|g| g.name == group.name) {
debug!("Adding new group: {}", group.name);
groups.push(group.clone());
} else {
debug!("Group {} already exists, skipping add", group.name);
}
drop(groups);
debug!(
"Current ANGREAL_GROUPS state: {:#?}",
ANGREAL_GROUPS.lock().unwrap()
);
group
}
}
#[derive(Debug)]
#[pyclass(name = "Command")]
pub struct AngrealCommand {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub about: Option<String>,
#[pyo3(get)]
pub long_about: Option<String>,
#[pyo3(get)]
pub func: Py<PyAny>,
#[pyo3(get)]
pub group: Option<Vec<AngrealGroup>>,
#[pyo3(get)]
pub tool: Option<ToolDescription>,
pub registry_key: Option<String>,
}
impl Clone for AngrealCommand {
fn clone(&self) -> Self {
Python::attach(|py| Self {
name: self.name.clone(),
about: self.about.clone(),
long_about: self.long_about.clone(),
func: self.func.clone_ref(py),
group: self.group.clone(),
tool: self.tool.clone(),
registry_key: self.registry_key.clone(),
})
}
}
#[pymethods]
impl AngrealCommand {
#[new]
#[pyo3(signature = (name, func, about=None, long_about=None, group=None, tool=None))]
fn __new__(
name: &str,
func: Py<PyAny>,
about: Option<&str>,
long_about: Option<&str>,
group: Option<Vec<AngrealGroup>>,
tool: Option<ToolDescription>,
) -> Self {
debug!("Creating new AngrealCommand with name: {}", name);
let id = COMMAND_COUNTER.fetch_add(1, Ordering::Relaxed);
let path_key = generate_command_path_key_from_parts(group.as_deref(), name);
let registry_key = format!("{}.__reg_{}", path_key, id);
let cmd = AngrealCommand {
name: name.to_string(),
about: about.map(|i| i.to_string()),
long_about: long_about.map(|i| i.to_string()),
group,
func,
tool,
registry_key: Some(registry_key.clone()),
};
ANGREAL_TASKS
.lock()
.unwrap()
.insert(registry_key.clone(), cmd.clone());
set_current_command_path(registry_key.clone());
debug!(
"Registered new command '{}' with registry key: {}",
name, registry_key
);
debug!(
"Updated ANGREAL_TASKS registry size: {}",
ANGREAL_TASKS.lock().unwrap().len()
);
cmd
}
pub fn add_group(&mut self, group: AngrealGroup) -> PyResult<()> {
debug!("Adding group '{}' to command '{}'", group.name, self.name);
let old_registry_key = self
.registry_key
.clone()
.unwrap_or_else(|| generate_command_path_key(self));
if self.group.is_none() {
debug!(
"Initializing empty group vector for command '{}'",
self.name
);
self.group = Some(Vec::new());
}
let mut g = self.group.as_mut().unwrap().clone();
debug!("Adding group '{}' to command '{}'", group.name, self.name);
g.insert(0, group);
self.group = Some(g.clone());
let id = COMMAND_COUNTER.fetch_add(1, Ordering::Relaxed);
let new_path_key = generate_command_path_key(self);
let new_registry_key = format!("{}.__reg_{}", new_path_key, id);
self.registry_key = Some(new_registry_key.clone());
let mut tasks = ANGREAL_TASKS.lock().unwrap();
if let Some(_cmd) = tasks.remove(&old_registry_key) {
tasks.insert(new_registry_key.clone(), self.clone());
debug!(
"Updated command registry key from '{}' to '{}'",
old_registry_key, new_registry_key
);
} else {
tasks.insert(new_registry_key.clone(), self.clone());
debug!(
"Inserted command with new registry key: '{}'",
new_registry_key
);
}
debug!("Current ANGREAL_TASKS registry size: {}", tasks.len());
drop(tasks);
let mut args_registry = ANGREAL_ARGS.lock().unwrap();
if let Some(args) = args_registry.remove(&old_registry_key) {
args_registry.insert(new_registry_key.clone(), args);
debug!(
"Moved arguments from '{}' to '{}'",
old_registry_key, new_registry_key
);
}
Ok(())
}
}
#[derive(Clone, Debug)]
#[pyclass(name = "Arg")]
pub struct AngrealArg {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub command_name: String,
pub command_path: String,
#[pyo3(get)]
pub takes_value: Option<bool>,
#[pyo3(get)]
pub default_value: Option<String>,
#[pyo3(get)]
pub is_flag: Option<bool>,
#[pyo3(get)]
pub require_equals: Option<bool>,
#[pyo3(get)]
pub multiple_values: Option<bool>,
#[pyo3(get)]
pub number_of_values: Option<u32>,
#[pyo3(get)]
pub max_values: Option<u32>,
#[pyo3(get)]
pub min_values: Option<u32>,
#[pyo3(get)]
pub python_type: Option<String>,
#[pyo3(get)]
pub short: Option<char>,
#[pyo3(get)]
pub long: Option<String>,
#[pyo3(get)]
pub long_help: Option<String>,
#[pyo3(get)]
pub help: Option<String>,
#[pyo3(get)]
pub required: Option<bool>,
}
#[pymethods]
impl AngrealArg {
#[new]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (name, command_name, default_value=None, is_flag=None, require_equals=None, multiple_values=None, number_of_values=None, max_values=None, min_values=None, short=None, long=None, long_help=None, help=None, required=None, takes_value=None, python_type=None))]
fn __new__(
name: &str,
command_name: &str,
default_value: Option<&str>,
is_flag: Option<bool>,
require_equals: Option<bool>,
multiple_values: Option<bool>,
number_of_values: Option<u32>,
max_values: Option<u32>,
min_values: Option<u32>,
short: Option<char>,
long: Option<&str>,
long_help: Option<&str>,
help: Option<&str>,
required: Option<bool>,
takes_value: Option<bool>,
python_type: Option<&str>,
) -> Self {
debug!(
"Creating new AngrealArg '{}' for command '{}'",
name, command_name
);
let command_path = get_current_command_path().unwrap_or_else(|| command_name.to_string());
let arg = AngrealArg {
name: name.to_string(),
command_name: command_name.to_string(),
command_path: command_path.clone(),
takes_value: Some(takes_value.unwrap_or(true)),
default_value: default_value.map(|i| i.to_string()),
is_flag: Some(is_flag.unwrap_or(false)),
require_equals,
multiple_values,
number_of_values,
max_values,
min_values,
python_type: Some(python_type.unwrap_or("str").to_string()),
short,
long: long.map(|i| i.to_string()),
long_help: long_help.map(|i| i.to_string()),
help: help.map(|i| i.to_string()),
required,
};
let mut args_registry = ANGREAL_ARGS.lock().unwrap();
args_registry
.entry(command_path.clone())
.or_default()
.push(arg.clone());
debug!(
"Registered new argument '{}' for command path '{}'",
name, command_path
);
debug!(
"Current ANGREAL_ARGS registry has {} command paths",
args_registry.len()
);
arg
}
}
#[cfg(test)]
mod tests {
use super::*;
use pyo3::Python;
#[test]
fn test_hierarchical_command_registration() {
Python::attach(|py| {
let original_tasks = ANGREAL_TASKS.lock().unwrap().clone();
let original_args = ANGREAL_ARGS.lock().unwrap().clone();
ANGREAL_TASKS.lock().unwrap().clear();
ANGREAL_ARGS.lock().unwrap().clear();
let group1 = AngrealGroup {
name: "group1".to_string(),
about: Some("First group".to_string()),
};
let group2 = AngrealGroup {
name: "group2".to_string(),
about: Some("Second group".to_string()),
};
let cmd1 = AngrealCommand {
name: "all".to_string(),
about: Some("Run all tests in group1".to_string()),
long_about: None,
group: Some(vec![group1.clone()]),
func: py.None(),
tool: None,
registry_key: None,
};
let cmd2 = AngrealCommand {
name: "all".to_string(),
about: Some("Run all tests in group2".to_string()),
long_about: None,
group: Some(vec![group2.clone()]),
func: py.None(),
tool: None,
registry_key: None,
};
let path1 = generate_command_path_key(&cmd1);
let path2 = generate_command_path_key(&cmd2);
ANGREAL_TASKS.lock().unwrap().insert(path1.clone(), cmd1);
ANGREAL_TASKS.lock().unwrap().insert(path2.clone(), cmd2);
assert_eq!(path1, "group1.all");
assert_eq!(path2, "group2.all");
assert_eq!(ANGREAL_TASKS.lock().unwrap().len(), 2);
assert!(ANGREAL_TASKS.lock().unwrap().get("group1.all").is_some());
assert!(ANGREAL_TASKS.lock().unwrap().get("group2.all").is_some());
let retrieved_cmd1 = ANGREAL_TASKS
.lock()
.unwrap()
.get("group1.all")
.unwrap()
.clone();
let retrieved_cmd2 = ANGREAL_TASKS
.lock()
.unwrap()
.get("group2.all")
.unwrap()
.clone();
assert_eq!(
retrieved_cmd1.about,
Some("Run all tests in group1".to_string())
);
assert_eq!(
retrieved_cmd2.about,
Some("Run all tests in group2".to_string())
);
*ANGREAL_TASKS.lock().unwrap() = original_tasks;
*ANGREAL_ARGS.lock().unwrap() = original_args;
});
}
#[test]
fn test_argument_collision_resolution() {
Python::attach(|py| {
let original_tasks = ANGREAL_TASKS.lock().unwrap().clone();
let original_args = ANGREAL_ARGS.lock().unwrap().clone();
ANGREAL_TASKS.lock().unwrap().clear();
ANGREAL_ARGS.lock().unwrap().clear();
let group1 = AngrealGroup {
name: "group1".to_string(),
about: None,
};
let group2 = AngrealGroup {
name: "group2".to_string(),
about: None,
};
let cmd1 = AngrealCommand {
name: "test".to_string(),
about: None,
long_about: None,
group: Some(vec![group1]),
func: py.None(),
tool: None,
registry_key: None,
};
let cmd2 = AngrealCommand {
name: "test".to_string(),
about: None,
long_about: None,
group: Some(vec![group2]),
func: py.None(),
tool: None,
registry_key: None,
};
let path1 = generate_command_path_key(&cmd1);
let path2 = generate_command_path_key(&cmd2);
ANGREAL_TASKS.lock().unwrap().insert(path1.clone(), cmd1);
ANGREAL_TASKS.lock().unwrap().insert(path2.clone(), cmd2);
let arg1 = AngrealArg {
name: "verbose".to_string(),
command_name: "test".to_string(),
command_path: path1.clone(),
takes_value: Some(false),
default_value: None,
is_flag: Some(true),
require_equals: None,
multiple_values: None,
number_of_values: None,
max_values: None,
min_values: None,
python_type: Some("bool".to_string()),
short: Some('v'),
long: Some("verbose".to_string()),
long_help: None,
help: Some("Verbose output".to_string()),
required: Some(false),
};
let arg2 = AngrealArg {
name: "force".to_string(),
command_name: "test".to_string(),
command_path: path2.clone(),
takes_value: Some(false),
default_value: None,
is_flag: Some(true),
require_equals: None,
multiple_values: None,
number_of_values: None,
max_values: None,
min_values: None,
python_type: Some("bool".to_string()),
short: Some('f'),
long: Some("force".to_string()),
long_help: None,
help: Some("Force operation".to_string()),
required: Some(false),
};
ANGREAL_ARGS
.lock()
.unwrap()
.entry(path1.clone())
.or_default()
.push(arg1);
ANGREAL_ARGS
.lock()
.unwrap()
.entry(path2.clone())
.or_default()
.push(arg2);
let args1 = crate::builder::select_args(&path1);
let args2 = crate::builder::select_args(&path2);
assert_eq!(args1.len(), 1);
assert_eq!(args2.len(), 1);
assert_eq!(args1[0].name, "verbose");
assert_eq!(args2[0].name, "force");
assert_eq!(args1[0].command_path, "group1.test");
assert_eq!(args2[0].command_path, "group2.test");
*ANGREAL_TASKS.lock().unwrap() = original_tasks;
*ANGREAL_ARGS.lock().unwrap() = original_args;
});
}
#[test]
fn test_path_key_generation() {
let key = generate_path_key_from_parts(&[], "build");
assert_eq!(key, "build");
let key = generate_path_key_from_parts(&["docker".to_string()], "run");
assert_eq!(key, "docker.run");
let key =
generate_path_key_from_parts(&["docker".to_string(), "compose".to_string()], "up");
assert_eq!(key, "docker.compose.up");
}
#[test]
fn test_unique_registry_keys_prevent_collision() {
Python::attach(|py| {
let original_tasks = ANGREAL_TASKS.lock().unwrap().clone();
let original_args = ANGREAL_ARGS.lock().unwrap().clone();
ANGREAL_TASKS.lock().unwrap().clear();
ANGREAL_ARGS.lock().unwrap().clear();
let top_build = AngrealCommand::__new__(
"build",
py.None(),
Some("compile the project"),
None,
None,
None,
);
let top_key = top_build.registry_key.clone().unwrap();
let mut docs_build = AngrealCommand::__new__(
"build",
py.None(),
Some("build the docs"),
None,
None,
None,
);
let docs_group = AngrealGroup {
name: "docs".to_string(),
about: Some("documentation commands".to_string()),
};
docs_build.add_group(docs_group).unwrap();
let docs_key = docs_build.registry_key.clone().unwrap();
assert_ne!(top_key, docs_key, "registry keys must differ");
let tasks = ANGREAL_TASKS.lock().unwrap();
assert!(
tasks.get(&top_key).is_some(),
"top-level build must be in registry"
);
assert!(
tasks.get(&docs_key).is_some(),
"docs build must be in registry"
);
let top_cmd = tasks.get(&top_key).unwrap();
let docs_cmd = tasks.get(&docs_key).unwrap();
assert_eq!(generate_command_path_key(top_cmd), "build");
assert_eq!(generate_command_path_key(docs_cmd), "docs.build");
assert_eq!(top_cmd.about, Some("compile the project".to_string()));
assert_eq!(docs_cmd.about, Some("build the docs".to_string()));
drop(tasks);
*ANGREAL_TASKS.lock().unwrap() = original_tasks;
*ANGREAL_ARGS.lock().unwrap() = original_args;
});
}
#[test]
fn test_registry_key_args_isolation() {
Python::attach(|py| {
let original_tasks = ANGREAL_TASKS.lock().unwrap().clone();
let original_args = ANGREAL_ARGS.lock().unwrap().clone();
ANGREAL_TASKS.lock().unwrap().clear();
ANGREAL_ARGS.lock().unwrap().clear();
let top_build =
AngrealCommand::__new__("build", py.None(), Some("compile"), None, None, None);
let top_key = top_build.registry_key.clone().unwrap();
let release_arg = AngrealArg {
name: "release".to_string(),
command_name: "build".to_string(),
command_path: top_key.clone(),
takes_value: Some(false),
default_value: None,
is_flag: Some(true),
require_equals: None,
multiple_values: None,
number_of_values: None,
max_values: None,
min_values: None,
python_type: Some("bool".to_string()),
short: Some('r'),
long: Some("release".to_string()),
long_help: None,
help: None,
required: Some(false),
};
ANGREAL_ARGS
.lock()
.unwrap()
.entry(top_key.clone())
.or_default()
.push(release_arg);
let mut docs_build =
AngrealCommand::__new__("build", py.None(), Some("build docs"), None, None, None);
let pre_group_key = docs_build.registry_key.clone().unwrap();
let format_arg = AngrealArg {
name: "format".to_string(),
command_name: "build".to_string(),
command_path: pre_group_key.clone(),
takes_value: Some(true),
default_value: Some("html".to_string()),
is_flag: Some(false),
require_equals: None,
multiple_values: None,
number_of_values: None,
max_values: None,
min_values: None,
python_type: Some("str".to_string()),
short: Some('f'),
long: Some("format".to_string()),
long_help: None,
help: None,
required: Some(false),
};
ANGREAL_ARGS
.lock()
.unwrap()
.entry(pre_group_key.clone())
.or_default()
.push(format_arg);
docs_build
.add_group(AngrealGroup {
name: "docs".to_string(),
about: None,
})
.unwrap();
let docs_key = docs_build.registry_key.clone().unwrap();
let top_args = crate::builder::select_args(&top_key);
let docs_args = crate::builder::select_args(&docs_key);
assert_eq!(top_args.len(), 1);
assert_eq!(top_args[0].name, "release");
assert_eq!(docs_args.len(), 1);
assert_eq!(docs_args[0].name, "format");
*ANGREAL_TASKS.lock().unwrap() = original_tasks;
*ANGREAL_ARGS.lock().unwrap() = original_args;
});
}
}