from tests.fixtures.spark_imports import get_spark_imports
_imports = get_spark_imports()
SparkSession = _imports.SparkSession
F = _imports.F
class TestIssue366AliasPosexplode:
def _get_unique_app_name(self, test_name: str) -> str:
import os
import threading
thread_id = threading.current_thread().ident
process_id = os.getpid()
return f"{test_name}_{process_id}_{thread_id}"
def test_posexplode_alias_two_names_select(self, spark):
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [10, 20]},
{"Name": "Bob", "Values": [30, 40]},
]
)
result = df.select("Name", F.posexplode("Values").alias("Value1", "Value2"))
rows = result.collect()
assert len(rows) == 4
keys = list(rows[0].asDict().keys()) if rows else []
assert "Name" in keys and "Value1" in keys and "Value2" in keys
by_name = {r["Name"]: [] for r in rows}
for r in rows:
by_name[r["Name"]].append((r["Value1"], r["Value2"]))
assert by_name["Alice"] == [(0, 10), (1, 20)]
assert by_name["Bob"] == [(0, 30), (1, 40)]
def test_posexplode_alias_two_names_no_type_error(self, spark):
df = spark.createDataFrame([{"x": [1, 2], "y": "ok"}])
result = df.select("y", F.posexplode("x").alias("pos", "val"))
rows = result.collect()
assert len(rows) >= 1
keys = list(rows[0].asDict().keys()) if rows else []
assert "y" in keys and "pos" in keys and "val" in keys
def test_posexplode_alias_two_names_single_element(self, spark):
df = spark.createDataFrame([{"id": 1, "arr": [42]}])
result = df.select("id", F.posexplode("arr").alias("idx", "elem"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["id"] == 1
assert rows[0]["idx"] == 0 and rows[0]["elem"] == 42
def test_posexplode_alias_two_names_empty_array(self, spark):
df = spark.createDataFrame([{"id": 1, "arr": []}, {"id": 2, "arr": [10, 20]}])
result = df.select("id", F.posexplode("arr").alias("pos", "val"))
rows = result.collect()
assert len(rows) == 2
by_id = {r["id"]: [] for r in rows}
for r in rows:
by_id[r["id"]].append((r["pos"], r["val"]))
assert 2 in by_id
assert by_id[2] == [(0, 10), (1, 20)]
def test_posexplode_outer_alias_two_names(self, spark):
df = spark.createDataFrame(
[(1, [10, 20]), (2, None)], schema="id: int, arr: array<int>"
)
result = df.select("id", F.posexplode_outer("arr").alias("pos", "val"))
rows = result.collect()
assert len(rows) >= 3
ids = [r["id"] for r in rows]
assert 1 in ids and 2 in ids
by_id = {}
for r in rows:
by_id.setdefault(r["id"], []).append((r["pos"], r["val"]))
assert (0, 10) in by_id[1] and (1, 20) in by_id[1]
assert 2 in by_id
def test_posexplode_alias_select_two_names(self, spark):
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [10, 20]},
{"Name": "Bob", "Values": [30, 40]},
]
)
result = df.select("Name", F.posexplode("Values").alias("Value1", "col"))
rows = result.collect()
assert len(rows) >= 1
keys = list(rows[0].asDict().keys()) if rows else []
assert "Name" in keys and "Value1" in keys
def test_alias_empty_raises(self, spark):
df = spark.createDataFrame([{"Values": [1, 2]}])
result = df.select(F.posexplode("Values").alias())
rows = result.collect()
assert len(rows) == 2
assert result.columns == ["pos", "col"]
def test_explode_alias_single_name(self, spark):
df = spark.createDataFrame([{"arr": [1, 2, 3]}])
result = df.select(F.explode("arr").alias("num"))
rows = result.collect()
assert len(rows) == 3
assert [r["num"] for r in rows] == [1, 2, 3]
def test_posexplode_nested_arrays(self, spark):
df = spark.createDataFrame([{"nested": [[1, 2], [3, 4]]}])
result = df.select(F.posexplode("nested").alias("idx", "col"))
rows = result.collect()
assert len(rows) >= 1
assert "idx" in (rows[0].asDict().keys() if rows else [])