use std::path::Path;
use alint_core::{
Context, Error, Level, PerFileRule, Result, Rule, RuleSpec, Scope, Violation, eval_per_file,
};
use regex::Regex;
use serde::Deserialize;
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
enum Language {
Go,
Python,
Rust,
Js,
Generic,
}
impl Language {
fn default_pattern(self) -> Option<&'static str> {
Some(match self {
Self::Go => r#"^\s*(?:import\s+)?(?:_\s+|[A-Za-z][\w.]*\s+)?"([^"]+)"\s*(?://.*)?$"#,
Self::Python => r"^\s*(?:from|import)\s+([\w.]+)",
Self::Rust => r"^\s*(?:pub\s+)?use\s+([\w:]+)",
Self::Js => r#"(?:from\s*|require\s*\(\s*|import\s*\(\s*|import\s+)['"]([^'"]+)['"]"#,
Self::Generic => return None,
})
}
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct Options {
forbid: String,
#[serde(default)]
language: Option<Language>,
#[serde(default)]
import_pattern: Option<String>,
#[serde(default)]
allow: Vec<String>,
}
#[derive(Debug)]
pub struct ImportGateRule {
id: String,
level: Level,
policy_url: Option<String>,
message: Option<String>,
scope: Scope,
forbid_src: String,
forbid: Regex,
import_re: Regex,
allow: Option<Scope>,
}
impl Rule for ImportGateRule {
alint_core::rule_common_impl!();
fn path_scope(&self) -> Option<&Scope> {
Some(&self.scope)
}
fn evaluate(&self, ctx: &Context<'_>) -> Result<Vec<Violation>> {
eval_per_file(self, ctx)
}
fn as_per_file(&self) -> Option<&dyn PerFileRule> {
Some(self)
}
}
impl PerFileRule for ImportGateRule {
fn path_scope(&self) -> &Scope {
&self.scope
}
fn evaluate_file(
&self,
ctx: &Context<'_>,
path: &Path,
bytes: &[u8],
) -> Result<Vec<Violation>> {
if self
.allow
.as_ref()
.is_some_and(|a| a.matches(path, ctx.index))
{
return Ok(Vec::new());
}
let Ok(text) = std::str::from_utf8(bytes) else {
return Ok(Vec::new());
};
let mut violations = Vec::new();
for (i, line) in text.lines().enumerate() {
let Some(caps) = self.import_re.captures(line) else {
continue;
};
let Some(target) = caps.get(1).map(|m| m.as_str()) else {
continue;
};
if self.forbid.is_match(target) {
let msg = self.message.clone().unwrap_or_else(|| {
format!(
"forbidden import {target:?} at this scope (matches /{}/)",
self.forbid_src
)
});
violations.push(
Violation::new(msg)
.with_path(std::sync::Arc::<Path>::from(path))
.with_location(i + 1, 1),
);
}
}
Ok(violations)
}
}
pub fn build(spec: &RuleSpec) -> Result<Box<dyn Rule>> {
if spec.paths.is_none() {
return Err(Error::rule_config(
&spec.id,
"import_gate requires a `paths` field (the scope the gate applies to)",
));
}
let opts: Options = spec
.deserialize_options()
.map_err(|e| Error::rule_config(&spec.id, format!("invalid options: {e}")))?;
let pattern_src: String = match (&opts.import_pattern, opts.language) {
(Some(p), _) => p.clone(),
(None, Some(lang)) => lang
.default_pattern()
.ok_or_else(|| {
Error::rule_config(
&spec.id,
"import_gate `language: generic` requires an explicit `import_pattern`",
)
})?
.to_string(),
(None, None) => {
return Err(Error::rule_config(
&spec.id,
"import_gate requires `language:` (go/python/rust/js) or `import_pattern:`",
));
}
};
let import_re = Regex::new(&pattern_src)
.map_err(|e| Error::rule_config(&spec.id, format!("invalid `import_pattern`: {e}")))?;
let forbid = Regex::new(&opts.forbid)
.map_err(|e| Error::rule_config(&spec.id, format!("invalid `forbid` regex: {e}")))?;
let allow = if opts.allow.is_empty() {
None
} else {
Some(
Scope::from_patterns(&opts.allow)
.map_err(|e| Error::rule_config(&spec.id, format!("invalid `allow` glob: {e}")))?,
)
};
Ok(Box::new(ImportGateRule {
id: spec.id.clone(),
level: spec.level,
policy_url: spec.policy_url.clone(),
message: spec.message.clone(),
scope: Scope::from_spec(spec)?,
forbid_src: opts.forbid,
forbid,
import_re,
allow,
}))
}
#[cfg(test)]
mod tests {
use super::*;
fn rule(language: Language, forbid: &str, allow: &[&str]) -> ImportGateRule {
let pattern = language.default_pattern().expect("preset has a pattern");
ImportGateRule {
id: "t".into(),
level: Level::Error,
policy_url: None,
message: None,
scope: Scope::from_patterns(&["**/*".to_string()]).unwrap(),
forbid_src: forbid.into(),
forbid: Regex::new(forbid).unwrap(),
import_re: Regex::new(pattern).unwrap(),
allow: if allow.is_empty() {
None
} else {
Some(
Scope::from_patterns(
&allow.iter().map(ToString::to_string).collect::<Vec<_>>(),
)
.unwrap(),
)
},
}
}
fn eval(r: &ImportGateRule, path: &str, src: &str) -> Vec<Violation> {
let idx = alint_core::FileIndex::from_entries(vec![alint_core::FileEntry {
path: Path::new(path).into(),
is_dir: false,
size: 1,
}]);
let ctx = Context {
root: Path::new("/"),
index: &idx,
registry: None,
facts: None,
vars: None,
git_tracked: None,
git_blame: None,
};
r.evaluate_file(&ctx, Path::new(path), src.as_bytes())
.unwrap()
}
#[test]
fn go_grouped_and_single_imports_are_gated() {
let r = rule(Language::Go, r"^k8s\.io/kubernetes/", &[]);
let src = "package x\n\nimport (\n\t\"fmt\"\n\t\"k8s.io/kubernetes/pkg/api\"\n)\n\nimport \"k8s.io/kubernetes/cmd\"\n";
let v = eval(&r, "staging/a.go", src);
assert_eq!(v.len(), 2, "both forbidden imports flagged: {v:?}");
assert!(v[0].message.contains("k8s.io/kubernetes/pkg/api"));
assert!(v.iter().all(|x| !x.message.contains("\"fmt\"")));
}
#[test]
fn target_not_raw_line_no_false_positive_on_comment() {
let r = rule(Language::Go, r"^k8s\.io/kubernetes/", &[]);
let src = "package x\n// see k8s.io/kubernetes/pkg for context\nvar s = \"k8s.io/kubernetes/x is a path\"\n";
assert!(eval(&r, "staging/a.go", src).is_empty());
}
#[test]
fn allow_glob_exempts_a_scoped_file() {
let r = rule(
Language::Go,
r"^k8s\.io/kubernetes/",
&["staging/legacy/**"],
);
let src = "import \"k8s.io/kubernetes/pkg\"\n";
assert_eq!(eval(&r, "staging/a.go", src).len(), 1);
assert!(eval(&r, "staging/legacy/old.go", src).is_empty());
}
#[test]
fn python_from_and_import_forms() {
let r = rule(Language::Python, r"^airflow\.providers", &[]);
let src =
"from airflow.providers.amazon import S3\nimport airflow.providers.google\nimport os\n";
let v = eval(&r, "airflow/core/x.py", src);
assert_eq!(v.len(), 2, "{v:?}");
assert!(
eval(
&r,
"airflow/core/x.py",
"import os\nfrom airflow.models import DAG\n"
)
.is_empty()
);
}
#[test]
fn rust_use_paths() {
let r = rule(Language::Rust, r"^crate::secrets", &[]);
let src = "use crate::secrets::Key;\npub use std::process::Command;\n";
let v = eval(&r, "src/a.rs", src);
assert_eq!(v.len(), 1);
assert!(v[0].message.contains("crate::secrets"));
}
#[test]
fn js_import_and_require() {
let r = rule(Language::Js, r"^lodash", &[]);
let src = "import _ from \"lodash\";\nconst x = require('lodash/fp');\nimport y from \"react\";\n";
assert_eq!(eval(&r, "src/a.js", src).len(), 2);
}
#[test]
fn build_errors_on_generic_without_pattern_and_bad_regex() {
let mut spec = crate::test_support::spec_yaml(
"id: t\nkind: import_gate\npaths: \"**/*\"\nlanguage: generic\nforbid: x\nlevel: error\n",
);
assert!(build(&spec).unwrap_err().to_string().contains("generic"));
spec = crate::test_support::spec_yaml(
"id: t\nkind: import_gate\npaths: \"**/*\"\nlanguage: rust\nforbid: \"[\"\nlevel: error\n",
);
assert!(build(&spec).unwrap_err().to_string().contains("forbid"));
}
}