causal-triangulations 0.1.0

Causal Dynamical Triangulations in d-dimensions
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
#!/usr/bin/env python3
"""benchmark_models.py - Data models and utilities for benchmark processing.

This module contains data models, parsing functions, and formatting utilities
for benchmark data processing. It provides the core data structures used
throughout the benchmark infrastructure.
"""

import re
import sys
from dataclasses import dataclass


@dataclass
class BenchmarkData:
    """Represents benchmark data for a single test case."""

    points: int | None
    dimension: str
    time_low: float = 0.0
    time_mean: float = 0.0
    time_high: float = 0.0
    time_unit: str = ""
    throughput_low: float | None = None
    throughput_mean: float | None = None
    throughput_high: float | None = None
    throughput_unit: str | None = None
    benchmark_id: str = ""
    simplices: int | None = None

    @property
    def comparison_key(self) -> str:
        """Return the stable key used for baseline/regression matching."""
        if self.benchmark_id:
            return self.benchmark_id
        if self.points is None:
            msg = "Unsized benchmarks require benchmark_id for comparison matching"
            raise ValueError(msg)
        return f"{self.points}_{self.dimension}"

    @property
    def points_label(self) -> str:
        """Return a display label for the benchmark input size."""
        return str(self.points) if self.points is not None else "n/a"

    def header_line(self) -> str:
        """Return the baseline/comparison section header for this benchmark."""
        if self.points is None:
            return f"=== Unsized Workload ({self.dimension}) ==="
        return f"=== {self.points} Points ({self.dimension}) ==="

    def with_timing(self, low: float, mean: float, high: float, unit: str) -> "BenchmarkData":
        """Set timing data (fluent interface)."""
        self.time_low = low
        self.time_mean = mean
        self.time_high = high
        self.time_unit = unit
        return self

    def with_throughput(self, low: float, mean: float, high: float, unit: str) -> "BenchmarkData":
        """Set throughput data (fluent interface)."""
        self.throughput_low = low
        self.throughput_mean = mean
        self.throughput_high = high
        self.throughput_unit = unit
        return self

    def with_simplices(self, count: int | None) -> "BenchmarkData":
        """Set the number of generated maximal simplices."""
        self.simplices = count
        return self

    def to_baseline_format(self) -> str:
        """Convert to baseline file format."""
        lines = [
            self.header_line(),
        ]
        if self.benchmark_id:
            lines.append(f"Benchmark ID: {self.benchmark_id}")
        lines.append(f"Time: [{self.time_low}, {self.time_mean}, {self.time_high}] {self.time_unit}")

        if self.throughput_low is not None and self.throughput_mean is not None and self.throughput_high is not None and self.throughput_unit:
            lines.append(f"Throughput: [{self.throughput_low}, {self.throughput_mean}, {self.throughput_high}] {self.throughput_unit}")

        lines.append("")
        return "\n".join(lines)


@dataclass
class CircumspherePerformanceData:
    """Represents circumsphere containment performance data."""

    method: str  # insphere, insphere_distance, insphere_lifted
    time_ns: float
    relative_performance: float | None = None
    winner: bool = False


@dataclass
class CircumsphereTestCase:
    """Represents a circumsphere test case with multiple method results."""

    test_name: str
    dimension: str
    methods: dict[str, CircumspherePerformanceData]
    is_boundary_case: bool = False  # True for boundary/edge cases with early-exit optimizations

    def get_winner(self) -> str | None:
        """Get the method name with the best performance."""
        if not self.methods:
            return None
        return min(self.methods.keys(), key=lambda m: self.methods[m].time_ns)

    def get_relative_performance(self, method: str, baseline_method: str | None = None) -> float:
        """Calculate relative performance compared to baseline method."""
        if method not in self.methods:
            return 0.0

        if baseline_method is None:
            baseline_method = self.get_winner()

        if baseline_method is None or baseline_method not in self.methods:
            return 1.0

        baseline_time = self.methods[baseline_method].time_ns
        method_time = self.methods[method].time_ns

        if baseline_time <= 0:
            return 1.0

        return method_time / baseline_time


@dataclass
class VersionComparisonData:
    """Represents performance comparison between versions."""

    test_case: str
    method: str
    old_version: str
    new_version: str
    old_value: float
    new_value: float
    unit: str
    improvement_pct: float = 0.0

    def __post_init__(self) -> None:
        """Calculate improvement percentage."""
        if self.old_value > 0:
            self.improvement_pct = ((self.old_value - self.new_value) / self.old_value) * 100
        else:
            self.improvement_pct = 0.0


# Benchmark parsing functions


def parse_benchmark_header(line: str) -> BenchmarkData | None:
    """
    Parse benchmark header lines to extract test configuration.

    Args:
        line: Input line potentially containing benchmark header

    Returns:
        BenchmarkData object or None if no match
    """
    # Match patterns like "=== 1000 Points (2D) ===" or "=== Unsized Workload (4D) ==="
    match = re.match(r"^=== (?:(\d+) Points|Unsized Workload) \((.+)\) ===$", line.strip())
    if match:
        points = int(match.group(1)) if match.group(1) is not None else None
        dimension = match.group(2)
        return BenchmarkData(points=points, dimension=dimension)
    return None


# NOTE: For hot-path parsing of large baseline files in CI, consider precompiling
# the regex patterns below using re.compile() for better performance.


def parse_time_data(benchmark: BenchmarkData, line: str) -> bool:
    """
    Parse time data lines to extract timing information.

    Args:
        benchmark: BenchmarkData object to update
        line: Input line potentially containing time data

    Returns:
        True if data was parsed successfully, False otherwise
    """
    # Match pattern like "Time: [100.0, 110.0, 120.0] µs"
    # Support scientific notation (1.2e3), negative values, and flexible whitespace
    match = re.match(r"^Time:\s*\[([0-9eE+.\-,\s]+)\]\s+(.+)$", line.strip())
    if match:
        try:
            # Parse the list of numbers
            values_str = match.group(1)
            unit = match.group(2)
            values = [float(x.strip()) for x in values_str.split(",")]

            if len(values) == 3:
                benchmark.time_low = values[0]
                benchmark.time_mean = values[1]
                benchmark.time_high = values[2]
                benchmark.time_unit = unit
                return True
        except ValueError:
            pass
    return False


def _parse_benchmark_id_data(benchmark: BenchmarkData, line: str) -> bool:
    """Parse optional baseline benchmark identifier metadata."""
    match = re.match(r"^Benchmark ID:\s*(.+)$", line.strip())
    if match:
        benchmark.benchmark_id = match.group(1).strip()
        return True
    return False


def parse_throughput_data(benchmark: BenchmarkData, line: str) -> bool:
    """
    Parse throughput data lines to extract throughput information.

    Args:
        benchmark: BenchmarkData object to update
        line: Input line potentially containing throughput data

    Returns:
        True if data was parsed successfully, False otherwise
    """
    # Match pattern like "Throughput: [8000.0, 9090.9, 10000.0] Kelem/s"
    # Support scientific notation (1.2e3), negative values, and flexible whitespace
    match = re.match(r"^Throughput:\s*\[([0-9eE+.\-,\s]+)\]\s+(.+)$", line.strip())
    if match:
        try:
            # Parse the list of numbers
            values_str = match.group(1)
            unit = match.group(2)
            values = [float(x.strip()) for x in values_str.split(",")]

            if len(values) == 3:
                benchmark.throughput_low = values[0]
                benchmark.throughput_mean = values[1]
                benchmark.throughput_high = values[2]
                benchmark.throughput_unit = unit
                return True
        except ValueError:
            pass
    return False


def _append_complete_benchmark(benchmarks: list[BenchmarkData], benchmark: BenchmarkData) -> None:
    """Append parsed benchmark data only after the required timing row is present."""
    if benchmark.time_unit:
        benchmarks.append(benchmark)
        return

    print(
        f"Warning: skipping incomplete benchmark section for {benchmark.points_label} Points ({benchmark.dimension}): missing valid Time line",
        file=sys.stderr,
    )


def extract_benchmark_data(baseline_content: str) -> list[BenchmarkData]:
    """
    Extract benchmark data from baseline file content.

    Args:
        baseline_content: Content from baseline results file

    Returns:
        List of BenchmarkData objects parsed from content
    """
    benchmarks = []
    current_benchmark = None

    for line in baseline_content.split("\n"):
        # Try to parse as benchmark header
        benchmark = parse_benchmark_header(line)
        if benchmark:
            # Save previous benchmark if it exists
            if current_benchmark:
                _append_complete_benchmark(benchmarks, current_benchmark)
            current_benchmark = benchmark
            continue

        if current_benchmark:
            if _parse_benchmark_id_data(current_benchmark, line):
                continue

            # Try to parse time data
            if parse_time_data(current_benchmark, line):
                continue

            # Try to parse throughput data
            if parse_throughput_data(current_benchmark, line):
                continue

    # Don't forget the last benchmark
    if current_benchmark:
        _append_complete_benchmark(benchmarks, current_benchmark)

    return benchmarks


# Benchmark formatting functions


def format_time_value(value: float, unit: str) -> str:
    """
    Format time values with appropriate precision and unit conversion.

    Args:
        value: Time value to format
        unit: Current unit of the value

    Returns:
        Formatted time string with appropriate unit, or "N/A" for invalid values
    """
    # Return N/A for zero or negative values (invalid measurements)
    if value <= 0:
        return "N/A"

    # Normalize microsecond aliases to standard µs
    unit = {"us": "µs", "μs": "µs"}.get((unit or "").strip(), (unit or "").strip())
    # Convert µs to ms if >= 1000 µs
    if unit == "µs" and value >= 1000:
        return f"{value / 1000:.3f} ms"
    # Convert ms to s if >= 1000 ms
    if unit == "ms" and value >= 1000:
        return f"{value / 1000:.4f} s"
    if unit == "µs":
        # Use 3 decimal places for values < 1, 2 decimal places otherwise
        if value < 1:
            return f"{value:.3f} µs"
        return f"{value:.2f} µs"
    return f"{value:.2f} {unit}"


def format_throughput_value(value: float | None, unit: str | None) -> str:
    """
    Format throughput values with appropriate precision.

    Args:
        value: Throughput value to format (can be None)
        unit: Unit of the value (can be None)

    Returns:
        Formatted throughput string
    """
    if value is None or unit is None:
        return "N/A"

    # Use 3 decimal places for values < 1 or with fractional parts needing precision
    if value < 1 or (value % 1) != 0:
        return f"{value:.3f} {unit}"
    return f"{value:.2f} {unit}"


def format_count_value(value: int | None) -> str:
    """
    Format a count value for benchmark tables.

    Args:
        value: Count value to format (can be None)

    Returns:
        Formatted count string
    """
    if value is None:
        return "N/A"
    return f"{value:,}"


def format_benchmark_tables(
    benchmarks: list[BenchmarkData],
    *,
    input_label: str = "Points",
    include_simplices: bool = False,
) -> list[str]:
    """
    Format benchmark data as markdown tables grouped by dimension.

    Args:
        benchmarks: List of BenchmarkData objects to format
        input_label: Display label for the benchmark input-size column
        include_simplices: When true, replace the scaling column with generated
            maximal simplex counts.

    Returns:
        List of markdown lines containing formatted tables
    """
    lines = []

    # Group benchmarks by dimension
    by_dimension: dict[str, list[BenchmarkData]] = {}
    for bench in benchmarks:
        dim = bench.dimension
        if dim not in by_dimension:
            by_dimension[dim] = []
        by_dimension[dim].append(bench)

    # Sort dimensions numerically (2D, 3D, etc.) rather than lexically
    def _dim_key(d: str) -> tuple[int, str]:
        """Sort key for dimensions: numeric prefix first, then string fallback."""
        m = re.match(r"^\s*(\d+)\s*[dD]\b", d)
        return (int(m.group(1)) if m else 1_000_000, d)

    for dimension in sorted(by_dimension.keys(), key=_dim_key):
        dim_benchmarks = sorted(
            by_dimension[dimension],
            key=lambda b: (b.points is None, b.points or 0, b.benchmark_id or b.header_line()),
        )
        include_benchmark_id = any(bench.benchmark_id for bench in dim_benchmarks)

        lines.extend([f"### {dimension} Triangulation Performance", ""])
        final_column_label = "Simplices Generated" if include_simplices else "Scaling"
        final_column_separator = "---------------------" if include_simplices else "----------"
        if include_benchmark_id:
            lines.extend(
                [
                    f"| Benchmark ID | {input_label} | Time (mean) | Throughput (mean) | {final_column_label} |",
                    f"|--------------|--------|-------------|-------------------|{final_column_separator}|",
                ],
            )
        else:
            lines.extend(
                [
                    f"| {input_label} | Time (mean) | Throughput (mean) | {final_column_label} |",
                    f"|--------|-------------|-------------------|{final_column_separator}|",
                ],
            )

        # Calculate scaling relative to the smallest numeric workload only for
        # legacy homogeneous tables. Expanded benchmark IDs mix different API
        # surfaces, so a single per-dimension scaling baseline is misleading.
        first_nonzero = None if include_benchmark_id else next((b for b in dim_benchmarks if b.time_mean and b.time_mean > 0), None)
        baseline_time = first_nonzero.time_mean if first_nonzero else None

        for bench in dim_benchmarks:
            # Format time and throughput
            time_str = format_time_value(bench.time_mean, bench.time_unit) if bench.time_unit else "N/A"
            throughput_str = (
                format_throughput_value(bench.throughput_mean, bench.throughput_unit) if bench.throughput_unit and bench.throughput_mean is not None else "N/A"
            )

            if include_simplices:
                final_column_str = format_count_value(bench.simplices)
            elif bench.time_mean > 0 and baseline_time and baseline_time > 0:
                scaling = bench.time_mean / baseline_time
                final_column_str = f"{scaling:.1f}x"
            else:
                final_column_str = "N/A"

            if include_benchmark_id:
                lines.append(f"| `{bench.comparison_key}` | {bench.points_label} | {time_str} | {throughput_str} | {final_column_str} |")
            else:
                lines.append(f"| {bench.points_label} | {time_str} | {throughput_str} | {final_column_str} |")

        lines.append("")  # Empty line between tables

    return lines