use crate::analyzer::kubelint::checks::builtin_checks;
use crate::analyzer::kubelint::config::{CheckSpec, KubelintConfig};
use crate::analyzer::kubelint::context::{LintContext, LintContextImpl};
use crate::analyzer::kubelint::parser::{helm, kustomize, yaml};
use crate::analyzer::kubelint::pragma::should_ignore_check;
use crate::analyzer::kubelint::types::{CheckFailure, Severity};
use std::path::Path;
#[derive(Debug, Clone)]
pub struct LintResult {
pub failures: Vec<CheckFailure>,
pub parse_errors: Vec<String>,
pub summary: LintSummary,
}
#[derive(Debug, Clone)]
pub struct LintSummary {
pub objects_analyzed: usize,
pub checks_run: usize,
pub passed: bool,
}
impl LintResult {
pub fn new() -> Self {
Self {
failures: Vec::new(),
parse_errors: Vec::new(),
summary: LintSummary {
objects_analyzed: 0,
checks_run: 0,
passed: true,
},
}
}
pub fn has_failures(&self) -> bool {
!self.failures.is_empty()
}
pub fn has_errors(&self) -> bool {
self.failures.iter().any(|f| f.severity == Severity::Error)
}
pub fn has_warnings(&self) -> bool {
self.failures
.iter()
.any(|f| f.severity == Severity::Warning)
}
pub fn max_severity(&self) -> Option<Severity> {
self.failures.iter().map(|f| f.severity).max()
}
pub fn should_fail(&self, config: &KubelintConfig) -> bool {
if config.no_fail {
return false;
}
if let Some(max) = self.max_severity() {
max >= config.failure_threshold
} else {
false
}
}
pub fn filter_by_threshold(&mut self, threshold: Severity) {
self.failures.retain(|f| f.severity >= threshold);
}
pub fn sort(&mut self) {
self.failures.sort();
}
}
impl Default for LintResult {
fn default() -> Self {
Self::new()
}
}
pub fn lint(path: &Path, config: &KubelintConfig) -> LintResult {
let mut result = LintResult::new();
if config.should_ignore_path(path) {
return result;
}
let (ctx, warning) = match load_context(path, config) {
Ok((ctx, warning)) => (ctx, warning),
Err(err) => {
result.parse_errors.push(err);
return result;
}
};
if let Some(warn) = warning {
result.parse_errors.push(warn);
}
result = run_checks(&ctx, config);
result
}
pub fn lint_file(path: &Path, config: &KubelintConfig) -> LintResult {
lint(path, config)
}
pub fn lint_content(content: &str, config: &KubelintConfig) -> LintResult {
let mut result = LintResult::new();
let mut ctx = LintContextImpl::new();
match yaml::parse_yaml(content) {
Ok(objects) => {
for obj in objects {
ctx.add_object(obj);
}
}
Err(err) => {
result.parse_errors.push(err.to_string());
return result;
}
}
run_checks(&ctx, config)
}
fn load_context(
path: &Path,
_config: &KubelintConfig,
) -> Result<(LintContextImpl, Option<String>), String> {
let mut ctx = LintContextImpl::new();
let mut warning: Option<String> = None;
if helm::is_helm_chart(path) {
match helm::render_helm_chart(path, None) {
Ok(objects) => {
for obj in objects {
ctx.add_object(obj);
}
}
Err(err) => {
let templates_dir = path.join("templates");
if templates_dir.exists() {
warning = Some(format!(
"Helm render failed ({}), falling back to raw template parsing",
err
));
match yaml::parse_yaml_dir(&templates_dir) {
Ok(objects) => {
for obj in objects {
ctx.add_object(obj);
}
}
Err(yaml_err) => {
return Err(format!(
"Failed to render Helm chart: {}. Fallback YAML parsing also failed: {}",
err, yaml_err
));
}
}
} else {
return Err(format!("Failed to render Helm chart: {}", err));
}
}
}
} else if kustomize::is_kustomize_dir(path) {
match kustomize::render_kustomize(path) {
Ok(objects) => {
for obj in objects {
ctx.add_object(obj);
}
}
Err(err) => return Err(format!("Failed to render Kustomize: {}", err)),
}
} else if path.is_dir() {
load_directory_with_rendering(&mut ctx, path)?;
} else {
match yaml::parse_yaml_file(path) {
Ok(objects) => {
for obj in objects {
ctx.add_object(obj);
}
}
Err(err) => return Err(format!("Failed to parse YAML file: {}", err)),
}
}
Ok((ctx, warning))
}
fn load_directory_with_rendering(ctx: &mut LintContextImpl, path: &Path) -> Result<(), String> {
use std::collections::HashSet;
let mut processed_dirs: HashSet<std::path::PathBuf> = HashSet::new();
for entry in walkdir::WalkDir::new(path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
{
let entry_path = entry.path();
if entry_path.is_dir() {
if helm::is_helm_chart(entry_path) {
if let Ok(objects) = helm::render_helm_chart(entry_path, None) {
for obj in objects {
ctx.add_object(obj);
}
}
processed_dirs.insert(entry_path.to_path_buf());
continue;
}
if kustomize::is_kustomize_dir(entry_path) {
if let Ok(objects) = kustomize::render_kustomize(entry_path) {
for obj in objects {
ctx.add_object(obj);
}
}
processed_dirs.insert(entry_path.to_path_buf());
continue;
}
}
}
for entry in walkdir::WalkDir::new(path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
{
let entry_path = entry.path();
if entry_path.is_file() {
let should_skip = processed_dirs
.iter()
.any(|processed| entry_path.starts_with(processed));
if should_skip {
continue;
}
let ext = entry_path.extension().and_then(|e| e.to_str());
if matches!(ext, Some("yaml") | Some("yml"))
&& let Ok(objects) = yaml::parse_yaml_file(entry_path)
{
for obj in objects {
ctx.add_object(obj);
}
}
}
}
Ok(())
}
fn run_checks(ctx: &LintContextImpl, config: &KubelintConfig) -> LintResult {
use crate::analyzer::kubelint::templates;
use crate::analyzer::kubelint::types::CheckFailure;
let mut result = LintResult::new();
let all_checks = builtin_checks();
let mut available_checks: Vec<&CheckSpec> = all_checks.iter().collect();
for custom in &config.custom_checks {
available_checks.push(custom);
}
let checks_to_run = config.resolve_checks(&all_checks);
result.summary.objects_analyzed = ctx.objects().len();
result.summary.checks_run = checks_to_run.len();
let mut check_funcs: std::collections::HashMap<String, Box<dyn templates::CheckFunc>> =
std::collections::HashMap::new();
for check in &checks_to_run {
if let Some(template) = templates::get_template(&check.template) {
match template.instantiate(&check.params) {
Ok(func) => {
check_funcs.insert(check.name.clone(), func);
}
Err(e) => {
eprintln!(
"Warning: Failed to instantiate check '{}': {}",
check.name, e
);
}
}
}
}
for obj in ctx.objects() {
for check in &checks_to_run {
if !check.scope.object_kinds.matches(&obj.kind()) {
continue;
}
if should_ignore_check(obj, &check.name) {
continue;
}
if let Some(func) = check_funcs.get(&check.name) {
let diagnostics = func.check(obj);
for diag in diagnostics {
let mut failure = CheckFailure::new(
check.name.as_str(),
Severity::Warning, &diag.message,
&obj.metadata.file_path,
obj.name(),
obj.kind().as_str(),
);
if let Some(ns) = obj.namespace() {
failure = failure.with_namespace(ns);
}
if let Some(line) = obj.metadata.line_number {
failure = failure.with_line(line);
}
if let Some(remediation) = diag.remediation {
failure = failure.with_remediation(remediation);
}
result.failures.push(failure);
}
}
}
}
result.filter_by_threshold(config.failure_threshold);
result.sort();
result.summary.passed = !result.should_fail(config);
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lint_result_new() {
let result = LintResult::new();
assert!(result.failures.is_empty());
assert!(result.parse_errors.is_empty());
assert!(result.summary.passed);
}
#[test]
fn test_lint_content_empty() {
let result = lint_content("", &KubelintConfig::default());
assert!(result.failures.is_empty());
}
#[test]
fn test_should_fail() {
let mut result = LintResult::new();
result.failures.push(CheckFailure::new(
"test-check",
Severity::Warning,
"test message",
"test.yaml",
"test-obj",
"Deployment",
));
let config = KubelintConfig::default().with_threshold(Severity::Warning);
assert!(result.should_fail(&config));
let config = KubelintConfig::default().with_threshold(Severity::Error);
assert!(!result.should_fail(&config));
let mut no_fail_config = KubelintConfig::default();
no_fail_config.no_fail = true;
assert!(!result.should_fail(&no_fail_config));
}
#[test]
fn test_lint_real_file() {
let test_file = std::path::Path::new("test-lint/k8s/insecure-deployment.yaml");
if !test_file.exists() {
eprintln!("Test file not found, skipping: {:?}", test_file);
return;
}
let content = std::fs::read_to_string(test_file).unwrap();
println!("=== File Content ===\n{}\n", content);
let config = KubelintConfig::default().with_all_builtin();
println!("=== Config ===");
println!("add_all_builtin: {}", config.add_all_builtin);
let result_content = lint_content(&content, &config);
println!("\n=== Lint Content Result ===");
println!(
"Objects analyzed: {}",
result_content.summary.objects_analyzed
);
println!("Checks run: {}", result_content.summary.checks_run);
println!("Failures: {}", result_content.failures.len());
for f in &result_content.failures {
println!(" - {} [{:?}]: {}", f.code, f.severity, f.message);
}
for e in &result_content.parse_errors {
println!(" Parse error: {}", e);
}
let result_file = lint_file(test_file, &config);
println!("\n=== Lint File Result ===");
println!("Objects analyzed: {}", result_file.summary.objects_analyzed);
println!("Checks run: {}", result_file.summary.checks_run);
println!("Failures: {}", result_file.failures.len());
for f in &result_file.failures {
println!(" - {} [{:?}]: {}", f.code, f.severity, f.message);
}
for e in &result_file.parse_errors {
println!(" Parse error: {}", e);
}
assert!(
result_content.has_failures() || result_file.has_failures(),
"Expected to find security issues in the test file!"
);
}
#[test]
fn test_lint_content_finds_issues() {
let yaml = r#"
apiVersion: apps/v1
kind: Deployment
metadata:
name: insecure-deploy
spec:
replicas: 1
selector:
matchLabels:
app: test
template:
spec:
containers:
- name: nginx
image: nginx:latest
securityContext:
privileged: true
"#;
let config = KubelintConfig::default().with_all_builtin();
let result = lint_content(yaml, &config);
assert!(
result.has_failures(),
"Expected linting failures for insecure deployment"
);
let privileged_failures: Vec<_> = result
.failures
.iter()
.filter(|f| f.code.as_str() == "privileged-container")
.collect();
assert!(
!privileged_failures.is_empty(),
"Should detect privileged container"
);
let latest_tag_failures: Vec<_> = result
.failures
.iter()
.filter(|f| f.code.as_str() == "latest-tag")
.collect();
assert!(!latest_tag_failures.is_empty(), "Should detect latest tag");
}
#[test]
fn test_lint_content_secure_deployment() {
let yaml = r#"
apiVersion: apps/v1
kind: Deployment
metadata:
name: secure-deploy
spec:
replicas: 1
selector:
matchLabels:
app: test
template:
spec:
serviceAccountName: my-service-account
securityContext:
runAsNonRoot: true
containers:
- name: nginx
image: nginx:1.21.0
securityContext:
privileged: false
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
livenessProbe:
httpGet:
path: /healthz
port: 8080
readinessProbe:
httpGet:
path: /ready
port: 8080
"#;
let config = KubelintConfig::default()
.include("privileged-container")
.include("latest-tag");
let result = lint_content(yaml, &config);
let critical_failures: Vec<_> = result
.failures
.iter()
.filter(|f| {
f.code.as_str() == "privileged-container" || f.code.as_str() == "latest-tag"
})
.collect();
assert!(
critical_failures.is_empty(),
"Secure deployment should not have privileged/latest-tag failures: {:?}",
critical_failures
);
}
#[test]
fn test_lint_content_with_ignore_annotation() {
let yaml = r#"
apiVersion: apps/v1
kind: Deployment
metadata:
name: ignored-deploy
annotations:
ignore-check.kube-linter.io/privileged-container: "intentionally privileged"
spec:
replicas: 1
selector:
matchLabels:
app: test
template:
spec:
containers:
- name: nginx
image: nginx:1.21.0
securityContext:
privileged: true
"#;
let config = KubelintConfig::default().include("privileged-container");
let result = lint_content(yaml, &config);
let privileged_failures: Vec<_> = result
.failures
.iter()
.filter(|f| f.code.as_str() == "privileged-container")
.collect();
assert!(
privileged_failures.is_empty(),
"Ignored check should not produce failures"
);
}
}