import argparse
import json
import statistics
import time
from typing import Any, Dict, List, Optional
from sparkless.sql import SparkSession, functions as F
def _build_spark() -> SparkSession:
return SparkSession.builder.appName("window_like_benchmark").getOrCreate()
def _generate_data(rows: int, groups: int) -> List[Dict[str, Any]]:
if groups <= 0:
raise ValueError("groups must be positive")
data: List[Dict[str, Any]] = []
for i in range(rows):
g = i % groups
data.append(
{
"group": int(g),
"id": int(i),
"value": float(i % 100),
}
)
return data
def _run_single_workload(
spark: SparkSession, rows: int, groups: int
) -> Dict[str, float]:
timings: Dict[str, float] = {}
data = _generate_data(rows=rows, groups=groups)
t0 = time.perf_counter()
df = spark.createDataFrame(data)
t1 = time.perf_counter()
timings["createDataFrame"] = t1 - t0
t0 = time.perf_counter()
by_group = df.groupBy("group").agg(
F.count(F.lit(1)).alias("count"),
F.sum("value").alias("sum_value"),
F.avg("value").alias("avg_value"),
)
t1 = time.perf_counter()
timings["groupBy_agg"] = t1 - t0
t0 = time.perf_counter()
joined = df.join(by_group, on="group", how="inner")
t1 = time.perf_counter()
timings["join"] = t1 - t0
t0 = time.perf_counter()
ordered = joined.orderBy("group", "id")
t1 = time.perf_counter()
timings["orderBy"] = t1 - t0
t0 = time.perf_counter()
_ = ordered.collect()
t1 = time.perf_counter()
timings["collect"] = t1 - t0
return timings
def _aggregate_timings(samples: List[Dict[str, float]]) -> Dict[str, Dict[str, float]]:
if not samples:
return {}
keys = samples[0].keys()
aggregated: Dict[str, Dict[str, float]] = {}
for key in keys:
values = [s[key] for s in samples]
aggregated[key] = {
"mean": statistics.fmean(values),
"median": statistics.median(values),
"min": min(values),
"max": max(values),
}
return aggregated
def run_benchmark(rows: int, groups: int, repetitions: int) -> Dict[str, Any]:
if repetitions <= 0:
raise ValueError("repetitions must be positive")
spark = _build_spark()
try:
_run_single_workload(spark, rows=rows, groups=groups)
runs: List[Dict[str, float]] = []
for _ in range(repetitions):
runs.append(_run_single_workload(spark, rows=rows, groups=groups))
stats = _aggregate_timings(runs)
return {
"params": {
"rows": rows,
"groups": groups,
"repetitions": repetitions,
},
"per_run": runs,
"stats": stats,
}
finally:
spark.stop()
def main(argv: Optional[List[str]] = None) -> None:
parser = argparse.ArgumentParser(
description="Window-like groupBy+join benchmark for Sparkless 4.x."
)
parser.add_argument("--rows", type=int, default=100_000)
parser.add_argument("--groups", type=int, default=1_000)
parser.add_argument("--repetitions", type=int, default=5)
parser.add_argument(
"--indent",
type=int,
default=2,
help="JSON indentation level for pretty-printing results.",
)
args = parser.parse_args(argv)
result = run_benchmark(
rows=args.rows,
groups=args.groups,
repetitions=args.repetitions,
)
print(json.dumps(result, indent=args.indent, sort_keys=True))
if __name__ == "__main__":
main()