from sparkless.testing import get_imports
_imports = get_imports()
SparkSession = _imports.SparkSession
F = _imports.F
class TestIssue293ExplodeWithColumn:
def _get_unique_app_name(self, test_name: str) -> str:
import uuid
return f"issue-293-{test_name}-{uuid.uuid4().hex[:8]}"
def test_explode_in_withcolumn(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": ["1", "2"]},
{"Name": "Bob", "Value": ["2", "3"]},
{"Name": "Charlie", "Value": ["4", "5"]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Value"))
rows = df.collect()
assert len(rows) == 6
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 2
assert {r["ExplodedValue"] for r in alice_rows} == {"1", "2"}
assert all(r["Value"] == ["1", "2"] for r in alice_rows)
bob_rows = [r for r in rows if r["Name"] == "Bob"]
assert len(bob_rows) == 2
assert {r["ExplodedValue"] for r in bob_rows} == {"2", "3"}
assert all(r["Value"] == ["2", "3"] for r in bob_rows)
charlie_rows = [r for r in rows if r["Name"] == "Charlie"]
assert len(charlie_rows) == 2
assert {r["ExplodedValue"] for r in charlie_rows} == {"4", "5"}
assert all(r["Value"] == ["4", "5"] for r in charlie_rows)
finally:
spark.stop()
def test_explode_in_select(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": ["1", "2"]},
{"Name": "Bob", "Value": ["2", "3"]},
]
)
result = df.select(
"Name", "Value", F.explode("Value").alias("ExplodedValue")
)
rows = result.collect()
assert len(rows) == 4
for row in rows:
assert "Name" in row.asDict()
assert "Value" in row.asDict()
assert "ExplodedValue" in row.asDict()
assert isinstance(row["Value"], list)
assert row["ExplodedValue"] in ["1", "2", "3"]
finally:
spark.stop()
def test_explode_with_integers(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Numbers": [1, 2, 3]},
{"Name": "Bob", "Numbers": [4, 5]},
]
)
df = df.withColumn("ExplodedNumber", F.explode("Numbers"))
rows = df.collect()
assert len(rows) == 5
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 3
assert {r["ExplodedNumber"] for r in alice_rows} == {1, 2, 3}
bob_rows = [r for r in rows if r["Name"] == "Bob"]
assert len(bob_rows) == 2
assert {r["ExplodedNumber"] for r in bob_rows} == {4, 5}
finally:
spark.stop()
def test_explode_with_empty_arrays(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": ["1", "2"]},
{"Name": "Bob", "Value": []}, {"Name": "Charlie", "Value": ["3"]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Value"))
rows = df.collect()
assert len(rows) == 3
bob_rows = [r for r in rows if r["Name"] == "Bob"]
assert len(bob_rows) == 0
finally:
spark.stop()
def test_explode_with_null_arrays(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": ["1", "2"]},
{"Name": "Bob", "Value": None}, {"Name": "Charlie", "Value": ["3"]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Value"))
rows = df.collect()
assert len(rows) == 3
bob_rows = [r for r in rows if r["Name"] == "Bob"]
assert len(bob_rows) == 0
finally:
spark.stop()
def test_explode_outer_with_null_arrays(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": ["1", "2"]},
{"Name": "Bob", "Value": None}, {"Name": "Charlie", "Value": ["3"]},
]
)
df = df.withColumn("ExplodedValue", F.explode_outer("Value"))
rows = df.collect()
assert len(rows) == 4
bob_rows = [r for r in rows if r["Name"] == "Bob"]
assert len(bob_rows) == 1
assert bob_rows[0]["ExplodedValue"] is None
finally:
spark.stop()
def test_explode_with_multiple_columns(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Age": 30, "Tags": ["a", "b"]},
{"Name": "Bob", "Age": 25, "Tags": ["c"]},
]
)
df = df.withColumn("Tag", F.explode("Tags"))
rows = df.collect()
assert len(rows) == 3
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 2
assert all(r["Age"] == 30 for r in alice_rows)
assert all(r["Name"] == "Alice" for r in alice_rows)
finally:
spark.stop()
def test_explode_chained_operations(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": ["1", "2"]},
{"Name": "Bob", "Value": ["3", "4"]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Value"))
df = df.filter(F.col("ExplodedValue") > "2")
rows = df.collect()
assert len(rows) == 2 assert all(r["ExplodedValue"] in ["3", "4"] for r in rows)
finally:
spark.stop()
def test_explode_with_floats(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [1.5, 2.5, 3.5]},
{"Name": "Bob", "Values": [4.0, 5.0]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
rows = df.collect()
assert len(rows) == 5
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 3
assert {r["ExplodedValue"] for r in alice_rows} == {1.5, 2.5, 3.5}
finally:
spark.stop()
def test_explode_with_booleans(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Flags": [True, False, True]},
{"Name": "Bob", "Flags": [False]},
]
)
df = df.withColumn("ExplodedFlag", F.explode("Flags"))
rows = df.collect()
assert len(rows) == 4
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 3
assert {r["ExplodedFlag"] for r in alice_rows} == {True, False}
finally:
spark.stop()
def test_explode_with_mixed_types(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Mixed": ["1", "two", "3.0"]},
{"Name": "Bob", "Mixed": ["a", "b"]},
]
)
df = df.withColumn("ExplodedMixed", F.explode("Mixed"))
rows = df.collect()
assert len(rows) == 5
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 3
exploded_values = [r["ExplodedMixed"] for r in alice_rows]
assert "1" in exploded_values
assert "two" in exploded_values
assert "3.0" in exploded_values
finally:
spark.stop()
def test_explode_with_single_element_arrays(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": [1]},
{"Name": "Bob", "Value": [42]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Value"))
rows = df.collect()
assert len(rows) == 2
alice_rows = [r for r in rows if r["Name"] == "Alice"]
bob_rows = [r for r in rows if r["Name"] == "Bob"]
assert len(alice_rows) == 1
assert alice_rows[0]["ExplodedValue"] == 1
assert len(bob_rows) == 1
assert bob_rows[0]["ExplodedValue"] == 42
finally:
spark.stop()
def test_explode_with_large_arrays(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": list(range(100))},
{"Name": "Bob", "Values": list(range(50, 150))},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
rows = df.collect()
assert len(rows) == 200
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 100
assert {r["ExplodedValue"] for r in alice_rows} == set(range(100))
finally:
spark.stop()
def test_explode_with_groupby_agg(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Category": "A", "Values": [1, 2, 3]},
{"Category": "A", "Values": [4, 5]},
{"Category": "B", "Values": [6, 7]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
result = df.groupBy("Category").agg(F.sum("ExplodedValue").alias("Total"))
rows = result.collect()
assert len(rows) == 2
category_a = [r for r in rows if r["Category"] == "A"][0]
assert category_a["Total"] == 15
category_b = [r for r in rows if r["Category"] == "B"][0]
assert category_b["Total"] == 13 finally:
spark.stop()
def test_explode_with_orderby(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [3, 1, 2]},
{"Name": "Bob", "Values": [5, 4]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
df = df.orderBy("ExplodedValue")
rows = df.collect()
assert len(rows) == 5
exploded_values = [r["ExplodedValue"] for r in rows]
assert exploded_values == sorted(exploded_values)
finally:
spark.stop()
def test_explode_with_distinct(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [1, 2, 2, 3]},
{"Name": "Bob", "Values": [2, 3, 3]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
df = df.select("ExplodedValue").distinct()
rows = df.collect()
non_null_rows = [r for r in rows if r["ExplodedValue"] is not None]
if len(non_null_rows) > 0:
assert len(non_null_rows) >= 1 exploded_values = sorted([r["ExplodedValue"] for r in non_null_rows])
assert all(v in [1, 2, 3] for v in exploded_values)
else:
assert len(rows) >= 0
finally:
spark.stop()
def test_explode_with_union(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df1 = spark.createDataFrame([{"Name": "Alice", "Values": [1, 2]}])
df2 = spark.createDataFrame([{"Name": "Bob", "Values": [3, 4]}])
df1 = df1.withColumn("ExplodedValue", F.explode("Values"))
df2 = df2.withColumn("ExplodedValue", F.explode("Values"))
result = df1.union(df2)
rows = result.collect()
non_null_rows = [r for r in rows if r["ExplodedValue"] is not None]
assert len(non_null_rows) >= 2
exploded_values = sorted([r["ExplodedValue"] for r in non_null_rows])
assert all(v in [1, 2, 3, 4] for v in exploded_values)
finally:
spark.stop()
def test_explode_with_computed_column(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [1, 2, 3]},
{"Name": "Bob", "Values": [4, 5]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
df = df.withColumn("Doubled", F.col("ExplodedValue") * 2)
rows = df.collect()
assert len(rows) == 5
for row in rows:
assert row["Doubled"] == row["ExplodedValue"] * 2
finally:
spark.stop()
def test_explode_with_when_otherwise(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [1, 2, 3, 4, 5]},
{"Name": "Bob", "Values": [6, 7, 8]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
df = df.withColumn(
"Category",
F.when(F.col("ExplodedValue") > 4, "High").otherwise("Low"),
)
rows = df.collect()
assert len(rows) == 8
high_rows = [r for r in rows if r["Category"] == "High"]
assert len(high_rows) == 4
low_rows = [r for r in rows if r["Category"] == "Low"]
assert len(low_rows) == 4 finally:
spark.stop()
def test_explode_with_cast(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": ["1", "2", "3"]},
{"Name": "Bob", "Values": ["4", "5"]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
df = df.withColumn("AsInt", F.col("ExplodedValue").cast("int"))
rows = df.collect()
assert len(rows) == 5
for row in rows:
assert isinstance(row["AsInt"], int)
assert row["AsInt"] == int(row["ExplodedValue"])
finally:
spark.stop()
def test_explode_with_string_operations(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Words": ["hello", "world"]},
{"Name": "Bob", "Words": ["test"]},
]
)
df = df.withColumn("ExplodedWord", F.explode("Words"))
df = df.withColumn("Uppercase", F.upper(F.col("ExplodedWord")))
rows = df.collect()
assert len(rows) == 3
for row in rows:
assert row["Uppercase"] == row["ExplodedWord"].upper()
finally:
spark.stop()
def test_explode_with_multiple_explodes(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Tags": ["a", "b"], "Numbers": [1, 2]},
{"Name": "Bob", "Tags": ["c"], "Numbers": [3]},
]
)
df = df.withColumn("ExplodedTag", F.explode("Tags"))
df = df.withColumn("ExplodedNumber", F.explode("Numbers"))
rows = df.collect()
assert len(rows) == 5
alice_rows = [r for r in rows if r["Name"] == "Alice"]
assert len(alice_rows) == 4
finally:
spark.stop()
def test_explode_with_join(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df1 = spark.createDataFrame(
[
{"ID": 1, "Values": [1, 2]},
{"ID": 2, "Values": [3, 4]},
]
)
df2 = spark.createDataFrame(
[
{"ID": 1, "Name": "Alice"},
{"ID": 2, "Name": "Bob"},
]
)
df1 = df1.withColumn("ExplodedValue", F.explode("Values"))
result = df1.join(df2, on="ID", how="inner")
rows = result.collect()
assert len(rows) == 4
for row in rows:
assert "Name" in row.asDict()
assert "ExplodedValue" in row.asDict()
finally:
spark.stop()
def test_explode_outer_with_empty_arrays(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": ["1", "2"]},
{"Name": "Bob", "Value": []}, {"Name": "Charlie", "Value": ["3"]},
]
)
df = df.withColumn("ExplodedValue", F.explode_outer("Value"))
rows = df.collect()
assert len(rows) == 4
bob_rows = [r for r in rows if r["Name"] == "Bob"]
assert len(bob_rows) == 1
assert bob_rows[0]["ExplodedValue"] is None
finally:
spark.stop()
def test_explode_with_alias(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [1, 2, 3]},
]
)
df = df.withColumn("E", F.explode("Values").alias("Exploded"))
rows = df.collect()
assert len(rows) == 3
assert "E" in rows[0].asDict()
assert rows[0]["E"] in [1, 2, 3]
finally:
spark.stop()
def test_explode_with_filter_after(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [1, 2, 3, 4, 5]},
{"Name": "Bob", "Values": [6, 7, 8]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
df = df.filter(F.col("ExplodedValue") > 5)
rows = df.collect()
assert len(rows) == 3
for row in rows:
assert row["ExplodedValue"] > 5
finally:
spark.stop()
def test_explode_with_filter_before(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Age": 30, "Values": [1, 2, 3]},
{"Name": "Bob", "Age": 20, "Values": [4, 5]},
]
)
df = df.filter(F.col("Age") > 25)
df = df.withColumn("ExplodedValue", F.explode("Values"))
rows = df.collect()
assert len(rows) == 3
for row in rows:
assert row["Name"] == "Alice"
assert row["Age"] == 30
finally:
spark.stop()
def test_explode_with_select_subset(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Age": 30, "Values": [1, 2, 3]},
{"Name": "Bob", "Age": 25, "Values": [4, 5]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
df = df.select("Name", "ExplodedValue")
rows = df.collect()
assert len(rows) == 5
for row in rows:
assert "Name" in row.asDict()
assert "ExplodedValue" in row.asDict()
assert "Age" not in row.asDict()
assert "Values" not in row.asDict()
finally:
spark.stop()
def test_explode_with_count(self):
import inspect
test_name = inspect.stack()[1].function
spark = SparkSession.builder.appName(
self._get_unique_app_name(test_name)
).getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [1, 2, 3]},
{"Name": "Bob", "Values": [4, 5]},
]
)
df = df.withColumn("ExplodedValue", F.explode("Values"))
result = df.groupBy("Name").agg(F.count("ExplodedValue").alias("Count"))
rows = result.collect()
assert len(rows) == 2
alice_row = [r for r in rows if r["Name"] == "Alice"][0]
assert alice_row["Count"] == 3
bob_row = [r for r in rows if r["Name"] == "Bob"][0]
assert bob_row["Count"] == 2
finally:
spark.stop()