robin-sparkless 0.11.5

PySpark-like DataFrame API in Rust on Polars; no JVM.
Documentation
# Test Creation Guide: PySpark Parity via Fixtures

This document explains how to create and maintain **behavioral parity tests** between PySpark and `robin-sparkless`.

**Sparkless integration**: Robin-sparkless is designed to replace the backend of [Sparkless](https://github.com/eddiethedean/sparkless). Sparkless has 270+ expected_outputs that can be converted to robin-sparkless fixtures. See [SPARKLESS_INTEGRATION_ANALYSIS.md](SPARKLESS_INTEGRATION_ANALYSIS.md) §4 for fixture format comparison and conversion strategy.

The goal is to:
- Use **PySpark as the oracle** for correct behavior.
- Generate **JSON fixtures** from PySpark runs.
- Consume those fixtures in **Rust tests** to ensure `robin-sparkless` matches PySpark.

**Note:** Running the Rust test suite (`make test`, `make check-full`) does
**not** require PySpark or Java. PySpark/Java 17+ are only required when
**generating or refreshing** fixtures (e.g. via `tests/convert_sparkless_fixtures.py`
and `tests/regenerate_expected_from_pyspark.py`, or when running
`make sparkless-parity` with regeneration enabled).

---

## 1. Overview

We separate the flow into three layers:

1. **Scenario definition (Python + PySpark, optional)**  
   Small scripts that build PySpark `DataFrame`s, apply operations, and record the expected results when you need new fixtures or want to refresh existing ones.

2. **Fixtures (JSON)**  
   A stable, machine-readable description of:
   - Input schema and rows  
   - The sequence of operations  
   - Expected schema and rows

3. **Rust test harness**  
   `tests/parity.rs` reads fixtures, reconstructs the input as a `robin-sparkless` `DataFrame`, applies the same operations, and compares results.

---

## 2. Fixture Format

Fixtures live under `tests/fixtures/` and are plain JSON files.

### Minimal schema

```jsonc
{
  "name": "filter_age_gt_30",
  "pyspark_version": "3.5.0",

  "input": {
    "schema": [
      { "name": "id",   "type": "int" },
      { "name": "age",  "type": "int" },
      { "name": "name", "type": "string" }
    ],
    "rows": [
      [1, 25, "Alice"],
      [2, 30, "Bob"],
      [3, 35, "Charlie"]
    ]
  },

  "operations": [
    { "op": "filter",  "expr": "col('age') > 30" },
    { "op": "select",  "columns": ["name", "age"] },
    { "op": "orderBy", "columns": ["name"], "ascending": [true] }
  ],

  "expected": {
    "schema": [
      { "name": "name", "type": "string" },
      { "name": "age",  "type": "int" }
    ],
    "rows": [
      ["Charlie", 35]
    ]
  }
}
```

**Notes:**
- `type` uses PySpark’s simple type names (`int`, `bigint`, `string`, `double`, `boolean`, etc.).
- `rows` are lists of JSON values; `null` is allowed.
- `operations` are **descriptive**, not executable Python strings; we only embed simple PySpark-style expressions for `filter` to keep parity obvious.

### Date and timestamp columns

The parity harness supports **date** and **timestamp** (or **datetime**) column types so that datetime functions (e.g. `date_add`, `datediff`, `current_date`) can be tested with real date/datetime inputs.

- **Schema**: Use `"type": "date"` or `"type": "timestamp"` (or `"datetime"`, `"timestamp_ntz"`) in `input.schema` and `expected.schema`.
- **Row values**: Use JSON strings in ISO format:
  - **Date**: `"YYYY-MM-DD"` (e.g. `"2024-01-15"`).
  - **Timestamp**: `"YYYY-MM-DDTHH:MM:SS"` or `"YYYY-MM-DDTHH:MM:SS.ffffff"` (e.g. `"2024-01-15T12:00:00"`, `"2024-01-15T12:00:00.123456"`).
- Timestamp can also be given as a numeric (epoch microseconds) in the JSON when building the column.

---

## 3. PySpark Generator Script

**Requirements:** PySpark (`pip install pyspark`) and **Java 17 or newer**. Set `JAVA_HOME` to a JDK 17+ installation if you see JVM or `UnsupportedClassVersionError` when starting PySpark.

Create a Python script, e.g. `tests/gen_pyspark_cases.py`, that:

1. Starts a PySpark `SparkSession` (pinned PySpark version).
2. Defines **scenario functions** that:
   - Build a PySpark `DataFrame` from literal data.
   - Apply a sequence of transformations/actions.
   - Collect both **input** and **expected** sections.
3. Writes JSON fixtures into `tests/fixtures/`.

### Example skeleton

```python
from pyspark.sql import SparkSession
import json
from pathlib import Path

spark = SparkSession.builder.appName("rs_parity_gen").getOrCreate()


def case_filter_age_gt_30():
    data = [(1, 25, "Alice"), (2, 30, "Bob"), (3, 35, "Charlie")]
    df = spark.createDataFrame(data, ["id", "age", "name"])

    out_df = df.filter("age > 30").select("name", "age").orderBy("name")

    def schema_to_json(schema):
        return [{"name": f.name, "type": f.dataType.simpleString()}
                for f in schema.fields]

    input_schema = schema_to_json(df.schema)
    input_rows = [list(r) for r in df.collect()]

    expected_schema = schema_to_json(out_df.schema)
    expected_rows = [list(r) for r in out_df.collect()]

    return {
        "name": "filter_age_gt_30",
        "pyspark_version": spark.version,
        "input": {"schema": input_schema, "rows": input_rows},
        "operations": [
            {"op": "filter", "expr": "col('age') > 30"},
            {"op": "select", "columns": ["name", "age"]},
            {"op": "orderBy", "columns": ["name"], "ascending": [True]}
        ],
        "expected": {"schema": expected_schema, "rows": expected_rows},
    }


def main():
    out_dir = Path("tests/fixtures")
    out_dir.mkdir(parents=True, exist_ok=True)

    fixtures = [
        case_filter_age_gt_30(),
        # add more scenarios here
    ]

    for fx in fixtures:
        path = out_dir / f"{fx['name']}.json"
        path.write_text(json.dumps(fx, indent=2))


if __name__ == "__main__":
    main()
```

Run this script manually (or via a Makefile target) whenever you add or update scenarios.

---

## 4. Rust Test Harness (`tests/parity.rs`)

The file `tests/parity.rs` implements the full parity flow:

- It defines `Fixture`, `InputSection`, `ColumnSpec`, `ExpectedSection`, and `Operation` to mirror the JSON format.
- The test `pyspark_parity_fixtures`:
  - Scans `tests/fixtures/`, `tests/fixtures/converted/`, and `tests/fixtures/pyspark_extracted/`.
  - Deserializes each JSON fixture, skips any with `"skip": true`.
  - For each fixture, calls `run_fixture(&fixture)`, which:
    1. **Input reconstruction**: Builds a `DataFrame` from `input.schema` and `input.rows` (or from `input.file_source` for read_csv/read_parquet/read_json).
    2. **Operation dispatch**: Applies each `Operation` in sequence (filter, select, orderBy, groupBy, agg, join, withColumn, window, union, distinct, drop, dropna, fillna, limit, withColumnRenamed, replace, crossJoin, describe, subtract, intersect, first, head, offset, summary, etc.).
    3. **Comparison**: Collects the result to schema + rows and compares to `expected` (schema equality and row equality with optional order handling).
  - Collects all failures and reports them at the end with fixture names (so CI shows every failing fixture, not just the first).

**Running a single fixture**: Set the `PARITY_FIXTURE` environment variable to the fixture `name` (e.g. `PARITY_FIXTURE=groupby_count`) to run only that fixture. Example:

```bash
PARITY_FIXTURE=groupby_count cargo test pyspark_parity_fixtures
```

**Running by phase**: Set the `PARITY_PHASE` environment variable to run only fixtures in that phase's manifest. Phases are `a`–`g`; the mapping lives in `tests/fixtures/phase_manifest.json`. Example:

```bash
PARITY_PHASE=a cargo test pyspark_parity_fixtures
make test-parity-phase-a   # same as above
make test-parity-phases    # runs all phases (a through g)
```

Phase A = signature alignment; B = high-value functions; C = Reader/Writer; D = DataFrame methods; E = SparkSession/Catalog (no fixtures, Python-only); F = behavioral; G = fixture expansion. See [PARITY_STATUS.md](PARITY_STATUS.md) for the phase-to-fixture mapping.

### Extending the harness

When adding new operations or expression forms:

1. **New operation type**: Add a variant to the `Operation` enum in `parity.rs`, deserialize it from the fixture JSON, and handle it in `apply_operations`.
2. **New expression in filter/withColumn**: Extend `parse_with_column_expr` (and any related helpers) so the fixture `expr` string is parsed into the correct Rust `Expr`.
3. **Comparison**: Schema and row comparison already support nulls, numbers, strings, arrays; extend `values_equal` / `compare_values` if new types need special handling.

---

## 5. Workflow for Adding a New Parity Test

1. **Pick a PySpark behavior to cover**
   - Example: `groupBy("name").count()` with nulls in the grouping column.

2. **Add a scenario function** in `gen_pyspark_cases.py`
   - Build the PySpark input `DataFrame`.
   - Apply the desired operations.
   - Emit a fixture JSON.

3. **Regenerate fixtures**
   - Run `python tests/gen_pyspark_cases.py`.

4. **Update Rust harness if needed**
   - If the new scenario uses an operation not yet supported by the dispatcher, update `Operation` and `apply_op` logic.

5. **Update phase manifest (optional)**  
   - Add the new fixture name to the appropriate phase in `tests/fixtures/phase_manifest.json` so it is included in phase-specific tests (`make test-parity-phase-X`).

6. **Run tests**
   - `cargo test pyspark_parity_fixtures` (runs all fixtures).
   - To run a single fixture: `PARITY_FIXTURE=<name> cargo test pyspark_parity_fixtures`.
   - To run by phase: `PARITY_PHASE=a cargo test pyspark_parity_fixtures` or `make test-parity-phase-a`.

---

## 6. Tracking Coverage

Maintain a simple matrix (e.g. in `PARITY_STATUS.md` or `ROADMAP.md`) with:

- Rows: operations / behaviors (e.g. `filter >`, `filter == null`, `groupBy+count`, `inner join`, etc.).
- Columns: data-type combinations, null presence, edge cases.
- Cell values: “covered by fixture X”, “not yet covered”, or “intentionally diverges from PySpark”.

This makes it clear where Robin Sparkless truly emulates PySpark today and where work remains.

---

## 7. Sparkless Fixture Conversion

Sparkless uses a different fixture format (`input_data` as dict rows, `expected_output` with `schema`/`data`). A converter can:

1. Read Sparkless `tests/expected_outputs/*.json`
2. Map `input_data``input.schema` + `input.rows`
3. Infer or annotate `operations` from the test name/category
4. Map `expected_output``expected.schema` + `expected.rows`
5. Write robin-sparkless fixtures to `tests/fixtures/`

This lets both projects validate against the same logical test cases. See [SPARKLESS_INTEGRATION_ANALYSIS.md](SPARKLESS_INTEGRATION_ANALYSIS.md) §4.1–4.4 for details.