from tests.fixtures.spark_imports import get_spark_imports
def _imports():
return get_spark_imports()
class TestDataFrameFirst:
def test_first_returns_single_row(self, spark):
df = spark.createDataFrame([{"name": "Alice"}, {"name": "Bob"}])
result = df.first()
assert result is not None
assert not isinstance(result, list)
assert result["name"] == "Alice"
def test_first_empty_dataframe_returns_none(self, spark):
imp = _imports()
schema = imp.StructType([imp.StructField("name", imp.StringType())])
try:
df = spark.createDataFrame([], schema=schema)
except TypeError:
df = spark.createDataFrame([], schema)
result = df.first()
assert result is None
def test_first_with_multiple_columns(self, spark):
data = [
{"name": "Alice", "age": 25, "city": "NYC"},
{"name": "Bob", "age": 30, "city": "LA"},
]
df = spark.createDataFrame(data)
result = df.first()
assert result["name"] == "Alice"
assert result["age"] == 25
assert result["city"] == "NYC"
def test_first_after_filter(self, spark):
data = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 35},
]
df = spark.createDataFrame(data)
filtered = df.filter("age > 25")
result = filtered.first()
assert result["name"] == "Bob"
assert result["age"] == 30
def test_first_after_orderby(self, spark):
data = [
{"name": "Charlie", "value": 3},
{"name": "Alice", "value": 1},
{"name": "Bob", "value": 2},
]
df = spark.createDataFrame(data)
ordered = df.orderBy("value")
result = ordered.first()
assert result["name"] == "Alice"
assert result["value"] == 1
def test_first_after_select(self, spark):
data = [{"name": "Alice", "age": 25}]
df = spark.createDataFrame(data)
selected = df.select("name")
result = selected.first()
assert result["name"] == "Alice"
row_dict = result.asDict() if hasattr(result, "asDict") else dict(result)
assert "age" not in row_dict
def test_first_vs_head_difference(self, spark):
data = [{"name": "Alice"}, {"name": "Bob"}]
df = spark.createDataFrame(data)
first_result = df.first()
head_result = df.head()
head_n_result = df.head(1)
assert not isinstance(first_result, list)
assert first_result["name"] == "Alice"
assert not isinstance(head_result, list)
assert head_result["name"] == "Alice"
assert isinstance(head_n_result, list)
assert len(head_n_result) == 1
assert head_n_result[0]["name"] == "Alice"
def test_first_with_null_values(self, spark):
data = [
{"name": None, "value": 1},
{"name": "Bob", "value": 2},
]
df = spark.createDataFrame(data)
result = df.first()
assert result["name"] is None
assert result["value"] == 1
def test_first_single_row_dataframe(self, spark):
df = spark.createDataFrame([{"value": 42}])
result = df.first()
assert result is not None
assert result["value"] == 42
def test_first_after_groupby_agg(self, spark):
data = [
{"dept": "A", "salary": 100},
{"dept": "A", "salary": 200},
{"dept": "B", "salary": 150},
]
df = spark.createDataFrame(data)
grouped = df.groupBy("dept").agg({"salary": "max"})
result = grouped.first()
assert result is not None
assert "dept" in result.asDict()
assert "max(salary)" in result.asDict() or "max_salary" in result.asDict()
def test_first_after_join(self, spark):
df1 = spark.createDataFrame([{"id": 1, "name": "Alice"}])
df2 = spark.createDataFrame([{"id": 1, "city": "NYC"}])
joined = df1.join(df2, "id")
result = joined.first()
assert result is not None
assert result["name"] == "Alice"
assert result["city"] == "NYC"
def test_first_after_union(self, spark):
df1 = spark.createDataFrame([{"val": 1}])
df2 = spark.createDataFrame([{"val": 2}])
unioned = df1.union(df2)
result = unioned.first()
assert result is not None
assert result["val"] in [1, 2]
def test_first_with_nested_struct(self, spark):
imp = _imports()
schema = imp.StructType(
[
imp.StructField("name", imp.StringType()),
imp.StructField(
"address",
imp.StructType(
[
imp.StructField("city", imp.StringType()),
imp.StructField("zip", imp.IntegerType()),
]
),
),
]
)
data = [{"name": "Alice", "address": {"city": "NYC", "zip": 10001}}]
try:
df = spark.createDataFrame(data, schema=schema)
except TypeError:
df = spark.createDataFrame(data, schema)
result = df.first()
assert result is not None
assert result["name"] == "Alice"
assert result["address"]["city"] == "NYC"
assert result["address"]["zip"] == 10001
def test_first_with_array_column(self, spark):
imp = _imports()
schema = imp.StructType(
[
imp.StructField("name", imp.StringType()),
imp.StructField("scores", imp.ArrayType(imp.IntegerType())),
]
)
data = [{"name": "Alice", "scores": [85, 90, 95]}]
try:
df = spark.createDataFrame(data, schema=schema)
except TypeError:
df = spark.createDataFrame(data, schema)
result = df.first()
assert result is not None
assert result["name"] == "Alice"
assert result["scores"] == [85, 90, 95]
def test_first_after_multiple_transformations(self, spark):
imp = _imports()
F = imp.F
data = [
{"name": "Alice", "age": 25, "score": 85},
{"name": "Bob", "age": 30, "score": 90},
{"name": "Charlie", "age": 35, "score": 75},
]
df = spark.createDataFrame(data)
result = (
df.filter(F.col("age") > 25)
.select("name", "score")
.orderBy("score")
.first()
)
assert result is not None
assert result["name"] in ["Bob", "Charlie"]
assert result["score"] in [75, 90]
assert result["name"] != "Alice"
def test_first_with_different_data_types(self, spark):
imp = _imports()
schema = imp.StructType(
[
imp.StructField("str_col", imp.StringType()),
imp.StructField("int_col", imp.IntegerType()),
imp.StructField("double_col", imp.DoubleType()),
imp.StructField("bool_col", imp.BooleanType()),
]
)
data = [
{
"str_col": "test",
"int_col": 42,
"double_col": 3.14,
"bool_col": True,
}
]
try:
df = spark.createDataFrame(data, schema=schema)
except TypeError:
df = spark.createDataFrame(data, schema)
result = df.first()
assert result is not None
assert result["str_col"] == "test"
assert result["int_col"] == 42
assert result["double_col"] == 3.14
assert result["bool_col"] is True
def test_first_after_distinct(self, spark):
data = [
{"val": 1},
{"val": 1},
{"val": 2},
]
df = spark.createDataFrame(data)
distinct_df = df.distinct()
result = distinct_df.first()
assert result is not None
assert result["val"] in [1, 2]
def test_first_with_all_nulls(self, spark):
imp = _imports()
schema = imp.StructType(
[
imp.StructField("name", imp.StringType()),
imp.StructField("age", imp.IntegerType()),
]
)
data = [{"name": None, "age": None}]
try:
df = spark.createDataFrame(data, schema=schema)
except TypeError:
df = spark.createDataFrame(data, schema)
result = df.first()
assert result is not None
assert result["name"] is None
assert result["age"] is None
def test_first_after_dropna(self, spark):
data = [
{"name": "Alice", "age": 25},
{"name": None, "age": None},
{"name": "Bob", "age": 30},
]
df = spark.createDataFrame(data)
cleaned = df.dropna()
result = cleaned.first()
assert result is not None
assert result["name"] in ["Alice", "Bob"] assert result["age"] is not None