from tests.fixtures.spark_backend import BackendType
from tests.fixtures.spark_imports import get_spark_imports
def test_posexplode_alias_two_names_returns_exploded_rows(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [10, 20]},
{"Name": "Bob", "Values": [30, 40]},
]
)
result = df.select("Name", F_backend.posexplode("Values").alias("Value1", "Value2"))
rows = result.collect()
assert "Value1" in result.columns
assert "Value2" in result.columns
assert len(rows) == 4, f"Expected 4 rows, got {len(rows)}"
by_name = {}
for r in rows:
n = r["Name"]
if n not in by_name:
by_name[n] = []
by_name[n].append((r["Value1"], r["Value2"]))
assert by_name["Alice"] == [(0, 10), (1, 20)]
assert by_name["Bob"] == [(0, 30), (1, 40)]
def test_posexplode_alias_no_none_values(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{"x": [1, 2, 3], "y": "ok"},
]
)
result = df.select("y", F_backend.posexplode("x").alias("pos", "val"))
rows = result.collect()
assert len(rows) == 3
for r in rows:
assert r["pos"] is not None, f"pos must not be None: {r}"
assert r["val"] is not None, f"val must not be None: {r}"
assert [(r["pos"], r["val"]) for r in rows] == [(0, 1), (1, 2), (2, 3)]
def test_posexplode_alias_chained_filter_orderby(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{"name": "A", "vals": [1, 2, 3]},
{"name": "B", "vals": [4, 5, 6]},
]
)
result = (
df.select("name", F_backend.posexplode("vals").alias("pos", "val"))
.filter(F_backend.col("pos") >= 1)
.orderBy("name", "pos")
.limit(5)
)
rows = result.collect()
assert "pos" in result.columns and "val" in result.columns
if spark_backend == BackendType.PYSPARK:
assert len(rows) == 4 assert [(r["name"], r["pos"], r["val"]) for r in rows] == [
("A", 1, 2),
("A", 2, 3),
("B", 1, 5),
("B", 2, 6),
]
else:
assert len(rows) >= 1
for r in rows:
assert r["pos"] is not None and r["val"] is not None
def test_posexplode_alias_empty_array(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{"id": 1, "arr": []},
{"id": 2, "arr": [10, 20]},
]
)
result = df.select("id", F_backend.posexplode("arr").alias("pos", "val"))
rows = result.collect()
if spark_backend == BackendType.PYSPARK:
assert len(rows) == 2 by_id = {r["id"]: [] for r in rows}
for r in rows:
by_id[r["id"]].append((r["pos"], r["val"]))
assert 2 in by_id
assert by_id[2] == [(0, 10), (1, 20)]
else:
assert len(rows) >= 1
assert "pos" in result.columns and "val" in result.columns
def test_posexplode_alias_single_element(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame([{"id": 1, "arr": [99]}])
result = df.select("id", F_backend.posexplode("arr").alias("pos", "val"))
rows = result.collect()
assert len(rows) >= 1
assert rows[0]["pos"] == 0 and rows[0]["val"] == 99
assert rows[0]["pos"] is not None and rows[0]["val"] is not None
def test_posexplode_alias_mixed_columns(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame([{"a": "x", "arr": [1, 2], "b": 10}])
result = df.select("a", F_backend.posexplode("arr").alias("pos", "val"), "b")
rows = result.collect()
assert result.columns == ["a", "pos", "val", "b"]
if spark_backend == BackendType.PYSPARK:
assert len(rows) == 2
assert rows[0]["a"] == "x" and rows[0]["b"] == 10
assert rows[0]["pos"] == 0 and rows[0]["val"] == 1
assert rows[1]["pos"] == 1 and rows[1]["val"] == 2
def test_posexplode_outer_alias_returns_exploded_rows(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[(1, [10, 20]), (2, None)],
schema="id: int, arr: array<int>",
)
result = df.select("id", F_backend.posexplode_outer("arr").alias("pos", "val"))
rows = result.collect()
assert "pos" in result.columns and "val" in result.columns
if spark_backend == BackendType.PYSPARK:
assert len(rows) >= 3 by_id = {}
for r in rows:
by_id.setdefault(r["id"], []).append((r["pos"], r["val"]))
assert (0, 10) in by_id[1] and (1, 20) in by_id[1]
assert 2 in by_id
def test_posexplode_alias_string_array(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame([{"id": 1, "arr": ["a", "b", "c"]}])
result = df.select("id", F_backend.posexplode("arr").alias("pos", "val"))
rows = result.collect()
assert len(rows) == 3
assert [(r["pos"], r["val"]) for r in rows] == [(0, "a"), (1, "b"), (2, "c")]
def test_posexplode_alias_column_object(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame([{"x": [100, 200]}])
result = df.select(F_backend.posexplode(F_backend.col("x")).alias("idx", "elem"))
rows = result.collect()
assert len(rows) == 2
assert [(r["idx"], r["elem"]) for r in rows] == [(0, 100), (1, 200)]
def test_posexplode_alias_show_no_none(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [10, 20]},
{"Name": "Bob", "Values": [30, 40]},
]
)
result = df.select("Name", F_backend.posexplode("Values").alias("Value1", "Value2"))
rows = result.collect()
assert len(rows) == 4
for r in rows:
assert r["Value1"] is not None, f"Value1 must not be None: {r}"
assert r["Value2"] is not None, f"Value2 must not be None: {r}"