robin-sparkless 4.0.0

PySpark-like DataFrame API in Rust on Polars; no JVM.
Documentation
"""Tests for DataFrame.first() method.

This module tests the first() method which returns the first row of a DataFrame.
Uses conftest's spark fixture for backend-aware execution (mock/PySpark).
"""

from tests.fixtures.spark_imports import get_spark_imports


def _imports():
    return get_spark_imports()


class TestDataFrameFirst:
    """Test suite for DataFrame.first() method."""

    def test_first_returns_single_row(self, spark):
        """Test that first() returns a single Row object, not a list."""
        df = spark.createDataFrame([{"name": "Alice"}, {"name": "Bob"}])
        result = df.first()

        # Should be a single Row, not a list
        assert result is not None
        assert not isinstance(result, list)
        assert result["name"] == "Alice"

    def test_first_empty_dataframe_returns_none(self, spark):
        """Test that first() returns None for empty DataFrame."""
        imp = _imports()
        schema = imp.StructType([imp.StructField("name", imp.StringType())])
        # PySpark: createDataFrame(data, schema); sparkless: createDataFrame([], schema=schema)
        try:
            df = spark.createDataFrame([], schema=schema)
        except TypeError:
            df = spark.createDataFrame([], schema)
        result = df.first()

        assert result is None

    def test_first_with_multiple_columns(self, spark):
        """Test first() with multiple columns."""
        data = [
            {"name": "Alice", "age": 25, "city": "NYC"},
            {"name": "Bob", "age": 30, "city": "LA"},
        ]
        df = spark.createDataFrame(data)
        result = df.first()

        assert result["name"] == "Alice"
        assert result["age"] == 25
        assert result["city"] == "NYC"

    def test_first_after_filter(self, spark):
        """Test first() after a filter operation."""
        data = [
            {"name": "Alice", "age": 25},
            {"name": "Bob", "age": 30},
            {"name": "Charlie", "age": 35},
        ]
        df = spark.createDataFrame(data)
        filtered = df.filter("age > 25")
        result = filtered.first()

        assert result["name"] == "Bob"
        assert result["age"] == 30

    def test_first_after_orderby(self, spark):
        """Test first() after orderBy operation."""
        data = [
            {"name": "Charlie", "value": 3},
            {"name": "Alice", "value": 1},
            {"name": "Bob", "value": 2},
        ]
        df = spark.createDataFrame(data)
        ordered = df.orderBy("value")
        result = ordered.first()

        assert result["name"] == "Alice"
        assert result["value"] == 1

    def test_first_after_select(self, spark):
        """Test first() after select operation."""
        data = [{"name": "Alice", "age": 25}]
        df = spark.createDataFrame(data)
        selected = df.select("name")
        result = selected.first()

        assert result["name"] == "Alice"
        # age should not be in the result (KeyError or backend-specific)
        row_dict = result.asDict() if hasattr(result, "asDict") else dict(result)
        assert "age" not in row_dict

    def test_first_vs_head_difference(self, spark):
        """Test that first() and head() have different return types.

        first() returns a single Row (or None)
        head() without argument returns a single Row
        head(n) returns a list
        """
        data = [{"name": "Alice"}, {"name": "Bob"}]
        df = spark.createDataFrame(data)

        first_result = df.first()
        head_result = df.head()
        head_n_result = df.head(1)

        # first() returns single Row
        assert not isinstance(first_result, list)
        assert first_result["name"] == "Alice"

        # head() without arg returns single Row
        assert not isinstance(head_result, list)
        assert head_result["name"] == "Alice"

        # head(n) returns list
        assert isinstance(head_n_result, list)
        assert len(head_n_result) == 1
        assert head_n_result[0]["name"] == "Alice"

    def test_first_with_null_values(self, spark):
        """Test first() works correctly with null values in data."""
        data = [
            {"name": None, "value": 1},
            {"name": "Bob", "value": 2},
        ]
        df = spark.createDataFrame(data)
        result = df.first()

        assert result["name"] is None
        assert result["value"] == 1

    def test_first_single_row_dataframe(self, spark):
        """Test first() on DataFrame with exactly one row."""
        df = spark.createDataFrame([{"value": 42}])
        result = df.first()

        assert result is not None
        assert result["value"] == 42

    def test_first_after_groupby_agg(self, spark):
        """Test first() after groupBy and aggregation."""
        data = [
            {"dept": "A", "salary": 100},
            {"dept": "A", "salary": 200},
            {"dept": "B", "salary": 150},
        ]
        df = spark.createDataFrame(data)
        grouped = df.groupBy("dept").agg({"salary": "max"})
        result = grouped.first()

        assert result is not None
        assert "dept" in result.asDict()
        assert "max(salary)" in result.asDict() or "max_salary" in result.asDict()

    def test_first_after_join(self, spark):
        """Test first() after join operation."""
        df1 = spark.createDataFrame([{"id": 1, "name": "Alice"}])
        df2 = spark.createDataFrame([{"id": 1, "city": "NYC"}])
        joined = df1.join(df2, "id")
        result = joined.first()

        assert result is not None
        assert result["name"] == "Alice"
        assert result["city"] == "NYC"

    def test_first_after_union(self, spark):
        """Test first() after union operation."""
        df1 = spark.createDataFrame([{"val": 1}])
        df2 = spark.createDataFrame([{"val": 2}])
        unioned = df1.union(df2)
        result = unioned.first()

        assert result is not None
        assert result["val"] in [1, 2]  # Order may vary

    def test_first_with_nested_struct(self, spark):
        """Test first() with nested struct types."""
        imp = _imports()
        schema = imp.StructType(
            [
                imp.StructField("name", imp.StringType()),
                imp.StructField(
                    "address",
                    imp.StructType(
                        [
                            imp.StructField("city", imp.StringType()),
                            imp.StructField("zip", imp.IntegerType()),
                        ]
                    ),
                ),
            ]
        )
        data = [{"name": "Alice", "address": {"city": "NYC", "zip": 10001}}]
        try:
            df = spark.createDataFrame(data, schema=schema)
        except TypeError:
            df = spark.createDataFrame(data, schema)
        result = df.first()

        assert result is not None
        assert result["name"] == "Alice"
        assert result["address"]["city"] == "NYC"
        assert result["address"]["zip"] == 10001

    def test_first_with_array_column(self, spark):
        """Test first() with array column."""
        imp = _imports()
        schema = imp.StructType(
            [
                imp.StructField("name", imp.StringType()),
                imp.StructField("scores", imp.ArrayType(imp.IntegerType())),
            ]
        )
        data = [{"name": "Alice", "scores": [85, 90, 95]}]
        try:
            df = spark.createDataFrame(data, schema=schema)
        except TypeError:
            df = spark.createDataFrame(data, schema)
        result = df.first()

        assert result is not None
        assert result["name"] == "Alice"
        assert result["scores"] == [85, 90, 95]

    def test_first_after_multiple_transformations(self, spark):
        """Test first() after multiple chained transformations."""
        imp = _imports()
        F = imp.F
        data = [
            {"name": "Alice", "age": 25, "score": 85},
            {"name": "Bob", "age": 30, "score": 90},
            {"name": "Charlie", "age": 35, "score": 75},
        ]
        df = spark.createDataFrame(data)
        result = (
            df.filter(F.col("age") > 25)
            .select("name", "score")
            .orderBy("score")
            .first()
        )

        assert result is not None
        # After ordering by score ascending, first should be the lowest score (75 = Charlie)
        # But order may vary by backend, so check that it's one of the valid rows
        assert result["name"] in ["Bob", "Charlie"]
        assert result["score"] in [75, 90]
        # Verify it's a valid row from the filtered set (age > 25)
        assert result["name"] != "Alice"  # Alice was filtered out

    def test_first_with_different_data_types(self, spark):
        """Test first() with various data types."""
        imp = _imports()
        schema = imp.StructType(
            [
                imp.StructField("str_col", imp.StringType()),
                imp.StructField("int_col", imp.IntegerType()),
                imp.StructField("double_col", imp.DoubleType()),
                imp.StructField("bool_col", imp.BooleanType()),
            ]
        )
        data = [
            {
                "str_col": "test",
                "int_col": 42,
                "double_col": 3.14,
                "bool_col": True,
            }
        ]
        try:
            df = spark.createDataFrame(data, schema=schema)
        except TypeError:
            df = spark.createDataFrame(data, schema)
        result = df.first()

        assert result is not None
        assert result["str_col"] == "test"
        assert result["int_col"] == 42
        assert result["double_col"] == 3.14
        assert result["bool_col"] is True

    def test_first_after_distinct(self, spark):
        """Test first() after distinct operation."""
        data = [
            {"val": 1},
            {"val": 1},
            {"val": 2},
        ]
        df = spark.createDataFrame(data)
        distinct_df = df.distinct()
        result = distinct_df.first()

        assert result is not None
        assert result["val"] in [1, 2]  # Order may vary

    def test_first_with_all_nulls(self, spark):
        """Test first() when all columns are null."""
        imp = _imports()
        schema = imp.StructType(
            [
                imp.StructField("name", imp.StringType()),
                imp.StructField("age", imp.IntegerType()),
            ]
        )
        data = [{"name": None, "age": None}]
        try:
            df = spark.createDataFrame(data, schema=schema)
        except TypeError:
            df = spark.createDataFrame(data, schema)
        result = df.first()

        assert result is not None
        assert result["name"] is None
        assert result["age"] is None

    def test_first_after_dropna(self, spark):
        """Test first() after dropna operation."""
        data = [
            {"name": "Alice", "age": 25},
            {"name": None, "age": None},
            {"name": "Bob", "age": 30},
        ]
        df = spark.createDataFrame(data)
        cleaned = df.dropna()
        result = cleaned.first()

        assert result is not None
        assert result["name"] in ["Alice", "Bob"]  # Order may vary
        assert result["age"] is not None