use pyo3::create_exception;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use crate::core::{self, Action, Pattern, Target};
use crate::source::cli;
use std::collections::{HashMap, HashSet};
create_exception!(
_directiva,
DirectiveError,
PyValueError,
"A directive failed to parse, or a `@file` failed to load."
);
struct PyTarget {
qualifier: Option<String>,
names: Vec<String>,
scopes: Vec<String>,
}
impl Target for PyTarget {
fn qualifier(&self) -> Option<&str> {
self.qualifier.as_deref()
}
fn matches_name(&self, p: &Pattern) -> bool {
self.names.iter().any(|n| p.matches(n))
}
fn matches_scope(&self, p: &Pattern) -> bool {
self.scopes.iter().any(|s| p.matches(s))
}
}
#[pyclass(name = "Directive", frozen)]
struct PyDirective {
inner: core::Directive<String>,
}
#[pymethods]
impl PyDirective {
#[getter]
fn action(&self) -> String {
self.inner.action.clone()
}
#[getter]
fn kind(&self) -> Option<String> {
self.inner.kind.clone()
}
#[getter]
fn name(&self) -> String {
self.inner.name.as_str().to_owned()
}
#[getter]
fn path(&self) -> Option<String> {
self.inner.path.as_ref().map(|p| p.as_str().to_owned())
}
#[getter]
fn note(&self) -> Option<String> {
self.inner.note.clone()
}
#[pyo3(signature = (names, *, qualifier=None, scopes=None))]
#[allow(clippy::needless_pass_by_value)]
fn matches(
&self,
names: Vec<String>,
qualifier: Option<String>,
scopes: Option<Vec<String>>,
) -> bool {
let t = PyTarget {
qualifier,
names,
scopes: scopes.unwrap_or_default(),
};
self.inner.matches(&t)
}
fn __str__(&self) -> String {
self.inner.to_string()
}
fn __repr__(&self) -> String {
format!("Directive({})", self.inner)
}
}
fn check_action(action: &str, actions: Option<&HashSet<String>>) -> PyResult<()> {
if let Some(allowed) = actions {
if !allowed.contains(action) {
let mut names: Vec<&str> = allowed.iter().map(String::as_str).collect();
names.sort_unstable();
return Err(DirectiveError::new_err(format!(
"unknown action {action:?}; allowed: {names:?}"
)));
}
}
Ok(())
}
#[pyfunction]
#[pyo3(signature = (spec, *, actions=None))]
#[allow(clippy::needless_pass_by_value)]
fn parse(spec: &str, actions: Option<HashSet<String>>) -> PyResult<PyDirective> {
let inner = core::parse(spec).map_err(|e| DirectiveError::new_err(e.to_string()))?;
check_action(&inner.action, actions.as_ref())?;
Ok(PyDirective { inner })
}
#[pyfunction]
fn glob_match(pattern: &str, text: &str) -> bool {
Pattern::compile(pattern).matches(text)
}
#[pyfunction]
#[pyo3(signature = (value, *, actions=None))]
#[allow(clippy::needless_pass_by_value)]
fn expand(value: &str, actions: Option<HashSet<String>>) -> PyResult<Vec<PyDirective>> {
let sourced =
cli::expand::<String>(value).map_err(|e| DirectiveError::new_err(e.to_string()))?;
let mut out = Vec::with_capacity(sourced.len());
for s in sourced {
check_action(&s.directive.action, actions.as_ref())?;
out.push(PyDirective { inner: s.directive });
}
Ok(out)
}
use crate::lint::{self, LintAction, Severity};
fn parse_severity(s: &str) -> PyResult<Severity> {
match s.to_ascii_lowercase().as_str() {
"error" => Ok(Severity::Error),
"warning" => Ok(Severity::Warning),
"info" => Ok(Severity::Info),
other => Err(DirectiveError::new_err(format!(
"unknown severity {other:?}; expected \"error\", \"warning\", or \"info\""
))),
}
}
fn severity_str(s: Severity) -> &'static str {
match s {
Severity::Error => "error",
Severity::Warning => "warning",
Severity::Info => "info",
}
}
fn to_lint_dirs(
py: Python<'_>,
directives: &[Py<PyDirective>],
) -> Vec<core::Directive<LintAction>> {
directives
.iter()
.filter_map(|d| {
let d = d.borrow(py);
LintAction::from_token(&d.inner.action).map(|action| core::Directive {
action,
kind: d.inner.kind.clone(),
name: d.inner.name.clone(),
path: d.inner.path.clone(),
note: d.inner.note.clone(),
})
})
.collect()
}
#[pyfunction]
#[pyo3(signature = (current, directives, names, *, qualifier=None, scopes=None))]
#[allow(clippy::needless_pass_by_value)]
fn lint_fold(
py: Python<'_>,
current: &str,
directives: Vec<Py<PyDirective>>,
names: Vec<String>,
qualifier: Option<String>,
scopes: Option<Vec<String>>,
) -> PyResult<(String, bool, Vec<String>)> {
let cur = parse_severity(current)?;
let target = PyTarget {
qualifier,
names,
scopes: scopes.unwrap_or_default(),
};
let f = lint::fold(cur, &target, &to_lint_dirs(py, &directives));
Ok((severity_str(f.severity).to_owned(), f.dropped, f.notes))
}
#[pyfunction]
#[allow(clippy::needless_pass_by_value)]
fn lint_settings(py: Python<'_>, directives: Vec<Py<PyDirective>>) -> Vec<(String, String)> {
lint::extract_settings(&to_lint_dirs(py, &directives))
}
#[pyclass(name = "Ladder", frozen)]
struct PyLadder {
inner: core::Ladder<String>,
}
#[pymethods]
impl PyLadder {
#[new]
#[allow(clippy::needless_pass_by_value)]
fn new(levels: Vec<String>) -> Self {
Self {
inner: core::Ladder::new(levels),
}
}
#[getter]
fn levels(&self) -> Vec<String> {
self.inner.levels().to_vec()
}
#[allow(clippy::needless_pass_by_value)]
fn position(&self, level: String) -> Option<usize> {
self.inner.position(&level)
}
#[allow(clippy::needless_pass_by_value)]
fn step(&self, level: String, delta: i32) -> String {
self.inner.step(&level, delta)
}
#[pyo3(signature = (current, directives, names, *, qualifier=None, scopes=None, steps=None, drop=None, notes=true))]
#[allow(
clippy::needless_pass_by_value,
clippy::too_many_arguments,
clippy::fn_params_excessive_bools
)]
fn fold(
&self,
py: Python<'_>,
current: String,
directives: Vec<Py<PyDirective>>,
names: Vec<String>,
qualifier: Option<String>,
scopes: Option<Vec<String>>,
steps: Option<HashMap<String, i32>>,
drop: Option<HashSet<String>>,
notes: bool,
) -> (String, bool, Vec<String>) {
let steps = steps.unwrap_or_default();
let drop = drop.unwrap_or_default();
let target = PyTarget {
qualifier,
names,
scopes: scopes.unwrap_or_default(),
};
let (mut delta, mut dropped, mut out) = (0i32, false, Vec::new());
for d in &directives {
let d = d.borrow(py);
if !d.inner.matches(&target) {
continue;
}
if notes {
if let Some(n) = &d.inner.note {
out.push(n.clone());
}
}
if let Some(s) = steps.get(&d.inner.action) {
delta += *s;
}
if drop.contains(&d.inner.action) {
dropped = true;
}
}
(self.inner.step(¤t, delta), dropped, out)
}
}
#[pymodule]
fn _directiva(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(parse, m)?)?;
m.add_function(wrap_pyfunction!(glob_match, m)?)?;
m.add_function(wrap_pyfunction!(expand, m)?)?;
m.add_function(wrap_pyfunction!(lint_fold, m)?)?;
m.add_function(wrap_pyfunction!(lint_settings, m)?)?;
m.add_class::<PyDirective>()?;
m.add_class::<PyLadder>()?;
m.add("DirectiveError", m.py().get_type::<DirectiveError>())?;
Ok(())
}