import argparse
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import date, datetime
from pathlib import Path
LOG_DIR = Path(os.environ.get("BATLESS_LOG_DIR", Path.home() / ".batless" / "stats"))
def _supports_color() -> bool:
if os.environ.get("NO_COLOR"):
return False
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
_COLOR = _supports_color()
class C:
RESET = "\033[0m" if _COLOR else ""
BOLD = "\033[1m" if _COLOR else ""
DIM = "\033[2m" if _COLOR else ""
RED = "\033[31m" if _COLOR else ""
GREEN = "\033[32m" if _COLOR else ""
YELLOW = "\033[33m" if _COLOR else ""
CYAN = "\033[36m" if _COLOR else ""
def load_file(path: Path) -> list[dict]:
entries = []
try:
with open(path) as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
pass
except FileNotFoundError:
pass
return entries
def load_logs(args) -> list[dict]:
if args.all:
files = sorted(LOG_DIR.glob("*.ndjson"))
elif args.date:
files = [LOG_DIR / f"{args.date}.ndjson"]
else:
files = [LOG_DIR / f"{date.today().isoformat()}.ndjson"]
entries = []
for f in files:
entries.extend(load_file(f))
if args.session:
entries = [e for e in entries if e.get("session") == args.session]
return entries
def build_signature(e: dict) -> str:
parts = ["batless"]
profile = e.get("profile")
if profile:
parts.append(f"--profile={profile}")
mode = e.get("mode", "default")
if mode not in ("default", ""):
parts.append(f"--mode={mode}")
max_lines = e.get("max_lines")
if max_lines:
parts.append(f"--max-lines={max_lines}")
max_bytes = e.get("max_bytes")
if max_bytes:
parts.append(f"--max-bytes={max_bytes}")
for flag in sorted(e.get("flags", [])):
parts.append(flag)
for flag in sorted(e.get("extra_flags", [])):
parts.append(flag)
return " ".join(parts)
def analyse(entries: list[dict]) -> dict:
if not entries:
return {"total_calls": 0}
total = len(entries)
modes = Counter(e.get("mode", "default") for e in entries)
profiles = Counter(e.get("profile") or "none" for e in entries)
sessions = Counter(e.get("session", "unknown") for e in entries)
file_exts = Counter()
flags_all = Counter()
extra_flags = Counter()
has_limit = sum(1 for e in entries if e.get("max_lines") or e.get("max_bytes"))
has_lines = sum(1 for e in entries if e.get("max_lines"))
has_bytes = sum(1 for e in entries if e.get("max_bytes"))
files_viewed = Counter()
command_sigs = Counter()
for e in entries:
for ext in e.get("file_exts", []):
file_exts[ext] += 1
for f in e.get("flags", []):
flags_all[f] += 1
for f in e.get("extra_flags", []):
extra_flags[f] += 1
for f in e.get("files", []):
files_viewed[os.path.basename(f)] += 1
command_sigs[build_signature(e)] += 1
hourly = Counter()
for e in entries:
try:
dt = datetime.fromisoformat(e["ts"].replace("Z", "+00:00"))
hourly[dt.hour] += 1
except Exception:
pass
max_lines_vals = [e["max_lines"] for e in entries if e.get("max_lines")]
max_bytes_vals = [e["max_bytes"] for e in entries if e.get("max_bytes")]
return {
"total_calls": total,
"unique_sessions": len(sessions),
"modes": dict(modes.most_common()),
"profiles": dict(profiles.most_common()),
"calls_with_limit": has_limit,
"calls_with_max_lines": has_lines,
"calls_with_max_bytes": has_bytes,
"avg_max_lines": round(sum(max_lines_vals) / len(max_lines_vals), 1) if max_lines_vals else None,
"avg_max_bytes": round(sum(max_bytes_vals) / len(max_bytes_vals), 1) if max_bytes_vals else None,
"top_flags": dict(flags_all.most_common(10)),
"top_extra_flags": dict(extra_flags.most_common(5)),
"top_file_exts": dict(file_exts.most_common(10)),
"top_files_viewed": dict(files_viewed.most_common(10)),
"sessions": dict(sessions.most_common()),
"hourly_distribution": {str(h): c for h, c in sorted(hourly.items())},
"command_signatures": dict(command_sigs.most_common()),
}
def bar(value: int, total: int, width: int = 20) -> str:
filled = round(value / total * width) if total else 0
return f"{C.GREEN}{'█' * filled}{C.RESET}{C.DIM}{'░' * (width - filled)}{C.RESET}"
def print_stats(stats: dict) -> None:
total = stats["total_calls"]
if total == 0:
print("No batless calls found in the selected log(s).")
return
def pct(n): return f"{C.CYAN}{n/total*100:5.1f}%{C.RESET}"
div = f"{C.BOLD}{C.CYAN}{'━' * 56}{C.RESET}"
def sec(s): return f" {C.BOLD}{C.YELLOW}{s}{C.RESET}"
print()
print(div)
print(f" {C.BOLD}batless usage statistics{C.RESET}")
print(div)
print(f" Total calls : {C.BOLD}{total}{C.RESET}")
print(f" Unique sessions : {C.BOLD}{stats['unique_sessions']}{C.RESET}")
print()
print(sec("── Output modes ──────────────────────────────────────"))
for mode, count in stats["modes"].items():
print(f" {mode:<16} {C.BOLD}{count:>4}{C.RESET} {bar(count, total)} {pct(count)}")
print()
print(sec("── AI profiles ───────────────────────────────────────"))
for profile, count in stats["profiles"].items():
print(f" {profile:<16} {C.BOLD}{count:>4}{C.RESET} {bar(count, total)} {pct(count)}")
print()
print(sec("── Output limiting ───────────────────────────────────"))
print(f" With any limit : {C.BOLD}{stats['calls_with_limit']:>4}{C.RESET} {pct(stats['calls_with_limit'])}")
print(f" --max-lines : {C.BOLD}{stats['calls_with_max_lines']:>4}{C.RESET} {pct(stats['calls_with_max_lines'])}")
print(f" --max-bytes : {C.BOLD}{stats['calls_with_max_bytes']:>4}{C.RESET} {pct(stats['calls_with_max_bytes'])}")
if stats["avg_max_lines"]:
print(f" avg max-lines : {C.BOLD}{stats['avg_max_lines']}{C.RESET}")
if stats["avg_max_bytes"]:
print(f" avg max-bytes : {C.BOLD}{stats['avg_max_bytes']}{C.RESET}")
print()
if stats.get("command_signatures"):
print(sec("── Unique command signatures ──────────────────────────"))
for sig, count in stats["command_signatures"].items():
print(f" {sig} {C.BOLD}{count}{C.RESET} {pct(count)}")
print()
if stats["top_flags"]:
print(sec("── Top flags ─────────────────────────────────────────"))
for flag, count in stats["top_flags"].items():
print(f" {flag:<24} {C.BOLD}{count:>4}{C.RESET} {pct(count)}")
print()
if stats["top_file_exts"]:
print(sec("── File extensions ───────────────────────────────────"))
for ext, count in stats["top_file_exts"].items():
print(f" {ext:<16} {C.BOLD}{count:>4}{C.RESET} {bar(count, total)} {pct(count)}")
print()
if stats["top_files_viewed"]:
print(sec("── Most-viewed files ─────────────────────────────────"))
for fname, count in stats["top_files_viewed"].items():
print(f" {fname:<30} {C.BOLD}{count:>4}{C.RESET}")
print()
if stats["hourly_distribution"]:
print(sec("── Hourly distribution (UTC) ─────────────────────────"))
max_h = max(stats["hourly_distribution"].values())
for hour, count in sorted(stats["hourly_distribution"].items(), key=lambda x: int(x[0])):
print(f" {int(hour):02d}:00 {bar(count, max_h, 16)} {C.BOLD}{count}{C.RESET}")
print()
print(div)
print()
def analyse_errors(entries: list[dict]) -> dict:
if not entries:
return {"total_errors": 0}
total = len(entries)
exit_codes = Counter(str(e.get("exit_code", "unknown")) for e in entries)
stderrs = Counter(e.get("stderr", "")[:120] for e in entries)
sessions = Counter(e.get("session", "unknown") for e in entries)
flag_patterns = Counter()
for e in entries:
flags = sorted(a for a in e.get("args", []) if a.startswith("-"))
flag_patterns[" ".join(flags) or "(no flags)"] += 1
hourly = Counter()
for e in entries:
try:
dt = datetime.fromisoformat(e["ts"].replace("Z", "+00:00"))
hourly[dt.hour] += 1
except Exception:
pass
return {
"total_errors": total,
"unique_sessions": len(sessions),
"exit_codes": dict(exit_codes.most_common()),
"top_stderrs": dict(stderrs.most_common(10)),
"top_flag_patterns": dict(flag_patterns.most_common(10)),
"hourly_distribution": {str(h): c for h, c in sorted(hourly.items())},
}
def print_errors(stats: dict) -> None:
total = stats["total_errors"]
if total == 0:
print("No batless errors found in the selected log(s).")
return
def pct(n): return f"{C.CYAN}{n/total*100:5.1f}%{C.RESET}"
div = f"{C.BOLD}{C.RED}{'━' * 56}{C.RESET}"
def sec(s): return f" {C.BOLD}{C.YELLOW}{s}{C.RESET}"
print()
print(div)
print(f" {C.BOLD}{C.RED}batless error statistics{C.RESET}")
print(div)
print(f" Total errors : {C.BOLD}{C.RED}{total}{C.RESET}")
print(f" Unique sessions : {C.BOLD}{stats['unique_sessions']}{C.RESET}")
print()
print(sec("── Exit codes ────────────────────────────────────────"))
for code, count in stats["exit_codes"].items():
print(f" exit {code:<12} {C.BOLD}{C.RED}{count:>4}{C.RESET} {pct(count)}")
print()
if stats["top_flag_patterns"]:
print(sec("── Flag patterns at time of error ────────────────────"))
for pattern, count in stats["top_flag_patterns"].items():
print(f" {pattern:<38} {C.BOLD}{count:>4}{C.RESET} {pct(count)}")
print()
if stats["top_stderrs"]:
print(sec("── Error messages ────────────────────────────────────"))
for msg, count in stats["top_stderrs"].items():
display = (msg[:43] + "...") if len(msg) > 46 else msg
print(f" {C.RED}{display:<46}{C.RESET} {C.BOLD}{count:>4}{C.RESET}")
print()
if stats["hourly_distribution"]:
print(sec("── Hourly distribution (UTC) ─────────────────────────"))
max_h = max(stats["hourly_distribution"].values())
for hour, count in sorted(stats["hourly_distribution"].items(), key=lambda x: int(x[0])):
print(f" {int(hour):02d}:00 {bar(count, max_h, 16)} {C.BOLD}{count}{C.RESET}")
print()
print(f" {C.CYAN}Report issues: https://github.com/docdyhr/batless/issues/new/choose{C.RESET}")
print(div)
print()
def print_commands_only(stats: dict) -> None:
total = stats["total_calls"]
if total == 0:
print("No batless calls found.")
return
sigs = stats.get("command_signatures", {})
print()
for sig, count in sigs.items():
pct = f"{count/total*100:.0f}%"
print(f" {sig} {count} {pct}")
print()
def main():
parser = argparse.ArgumentParser(
description="Analyse batless usage logs from batless-logger.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"examples:\n"
" batless-stats today's log\n"
" batless-stats --date 2025-04-01 specific date\n"
" batless-stats --all all logs combined\n"
" batless-stats --session a3f1b2c4 filter by session\n"
" batless-stats --errors error entries only\n"
" batless-stats --errors --json error stats as JSON\n"
" batless-stats --commands unique command signatures\n"
" batless-stats --json machine-readable JSON\n"
" batless-stats --log-dir /tmp/logs override log directory"
),
)
parser.add_argument("--date", metavar="YYYY-MM-DD", help="analyse a specific day")
parser.add_argument("--all", action="store_true", help="analyse all log files")
parser.add_argument("--session", metavar="ID", help="filter by session ID")
parser.add_argument("--json", action="store_true", help="output raw JSON")
parser.add_argument("--commands", action="store_true", help="show only unique command signatures")
parser.add_argument("--errors", action="store_true", help="show only error entries")
parser.add_argument("--log-dir", metavar="DIR", help="override log directory")
args = parser.parse_args()
global LOG_DIR
if args.log_dir:
LOG_DIR = Path(args.log_dir)
if not LOG_DIR.exists():
print(f"No log directory found at {LOG_DIR}")
print("Have you installed batless-logger and run any batless commands?")
sys.exit(0)
all_entries = load_logs(args)
error_entries = [e for e in all_entries if e.get("error")]
normal_entries = [e for e in all_entries if not e.get("error")]
if args.errors:
error_stats = analyse_errors(error_entries)
if args.json:
print(json.dumps(error_stats, indent=2))
else:
print_errors(error_stats)
return
stats = analyse(normal_entries)
if args.json:
print(json.dumps(stats, indent=2))
elif args.commands:
print_commands_only(stats)
else:
print_stats(stats)
if __name__ == "__main__":
main()