import json
import re
import sys
from pathlib import Path
from typing import List, Tuple, Dict
class PromQLValidator:
def __init__(self):
self.errors = []
self.warnings = []
def validate_expression(self, expr: str, context: str) -> bool:
expr = expr.strip()
if not expr:
self.errors.append(f"{context}: Empty expression")
return False
if not self._check_balanced_parens(expr):
self.errors.append(f"{context}: Unbalanced parentheses in: {expr}")
return False
if not self._check_functions(expr):
return False
if not self._check_rate_increase(expr, context):
return False
if not self._check_histogram_quantile(expr, context):
return False
if not self._check_label_matchers(expr, context):
return False
return True
def _check_balanced_parens(self, expr: str) -> bool:
stack = []
pairs = {'(': ')', '[': ']', '{': '}'}
for char in expr:
if char in pairs:
stack.append(char)
elif char in pairs.values():
if not stack or pairs[stack.pop()] != char:
return False
return len(stack) == 0
def _check_functions(self, expr: str) -> bool:
valid_functions = {
'rate', 'irate', 'increase', 'sum', 'avg', 'min', 'max', 'count',
'stddev', 'stdvar', 'topk', 'bottomk', 'histogram_quantile',
'abs', 'ceil', 'floor', 'round', 'sqrt', 'exp', 'ln', 'log2', 'log10',
'deriv', 'predict_linear', 'delta', 'idelta', 'changes',
'sort', 'sort_desc', 'clamp_max', 'clamp_min', 'time', 'timestamp'
}
function_pattern = r'\b([a-z_]+)\s*\('
functions_used = re.findall(function_pattern, expr)
for func in functions_used:
if func not in valid_functions:
self.warnings.append(f"Unknown function: {func} (might be valid, please verify)")
return True
def _check_rate_increase(self, expr: str, context: str) -> bool:
rate_pattern = r'(rate|irate|increase)\s*\(\s*([^)]+)\)'
matches = re.finditer(rate_pattern, expr)
for match in matches:
func = match.group(1)
arg = match.group(2)
if '[' not in arg:
self.errors.append(
f"{context}: {func}() requires a range vector, found: {func}({arg})"
)
return False
if not re.search(r'\[\w+\]', arg):
self.errors.append(
f"{context}: Invalid range selector in: {func}({arg})"
)
return False
return True
def _check_histogram_quantile(self, expr: str, context: str) -> bool:
hq_pattern = r'histogram_quantile\s*\(\s*([^,]+),\s*(.+)\)'
matches = re.finditer(hq_pattern, expr)
for match in matches:
quantile = match.group(1).strip()
vector_expr = match.group(2).strip()
try:
q = float(quantile)
if not 0 <= q <= 1:
self.errors.append(
f"{context}: histogram_quantile quantile must be between 0 and 1, got: {q}"
)
return False
except ValueError:
pass
if '_bucket' not in vector_expr:
self.warnings.append(
f"{context}: histogram_quantile should use _bucket metrics, found: {vector_expr}"
)
return True
def _check_label_matchers(self, expr: str, context: str) -> bool:
matcher_pattern = r'\{([^}]+)\}'
matches = re.findall(matcher_pattern, expr)
for matcher in matches:
parts = re.split(r'[=!~]+', matcher)
if len(parts) < 2:
self.errors.append(
f"{context}: Invalid label matcher syntax: {{{matcher}}}"
)
return False
return True
def extract_and_validate_queries(dashboard_dir: Path) -> Tuple[bool, List[str], List[str]]:
validator = PromQLValidator()
all_queries: Dict[str, List[str]] = {}
for dashboard_file in dashboard_dir.glob("*.json"):
with open(dashboard_file, 'r') as f:
data = json.load(f)
queries = []
def find_queries(obj, path=""):
if isinstance(obj, dict):
if 'expr' in obj and 'targets' not in path:
expr = obj['expr']
context = f"{dashboard_file.name} - {path}"
queries.append((expr, context))
validator.validate_expression(expr, context)
for key, value in obj.items():
find_queries(value, f"{path}.{key}" if path else key)
elif isinstance(obj, list):
for i, item in enumerate(obj):
find_queries(item, f"{path}[{i}]")
find_queries(data)
all_queries[dashboard_file.name] = [q[0] for q in queries]
return len(validator.errors) == 0, validator.errors, validator.warnings
def main():
repo_root = Path(__file__).parent.parent
dashboard_dir = repo_root / "monitoring" / "grafana" / "dashboards"
if not dashboard_dir.exists():
print(f"[ERROR] Dashboard directory not found: {dashboard_dir}")
sys.exit(1)
print("=" * 80)
print("PROMQL SYNTAX VALIDATION REPORT")
print("=" * 80)
print()
valid, errors, warnings = extract_and_validate_queries(dashboard_dir)
if errors:
print("ERRORS:")
print("-" * 80)
for error in errors:
print(f" [X] {error}")
print()
if warnings:
print("WARNINGS:")
print("-" * 80)
for warning in warnings:
print(f" [!] {warning}")
print()
print("=" * 80)
if valid:
if warnings:
print(f"[OK] SUCCESS: All PromQL queries are valid ({len(warnings)} warnings)")
else:
print("[OK] SUCCESS: All PromQL queries are valid (no warnings)")
else:
print(f"[ERROR] FAILURE: Found {len(errors)} PromQL syntax errors")
print("=" * 80)
print()
sys.exit(0 if valid else 1)
if __name__ == "__main__":
main()