from sparkless.testing import Mode
from sparkless.testing import get_imports
def test_posexplode_without_alias_no_type_error(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"Name": "Alice", "Values": [10, 20]},
{"Name": "Bob", "Values": [30, 40]},
]
)
result = df.select("Name", F_backend.posexplode("Values"))
assert "pos" in result.columns
assert "col" in result.columns
assert "Name" in result.columns
result.show()
def test_posexplode_without_alias_schema_projection(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "arr": [10, 20, 30]},
]
)
result = df.select("id", F_backend.posexplode("arr"))
schema = result.schema
assert schema is not None
field_names = [f.name for f in schema.fields]
assert "pos" in field_names
assert "col" in field_names
assert "id" in field_names
def test_posexplode_outer_without_alias_no_type_error(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame(
[(1, [10, 20]), (2, None)],
schema="id: int, arr: array<int>",
)
result = df.select("id", F_backend.posexplode_outer("arr"))
assert "pos" in result.columns
assert "col" in result.columns
schema = result.schema
assert schema is not None
def test_posexplode_without_alias_chained_operations(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"name": "A", "vals": [1, 2]},
{"name": "B", "vals": [3, 4, 5]},
]
)
result = (
df.select("name", F_backend.posexplode("vals"))
.filter(F_backend.col("pos") >= 1)
.limit(5)
)
assert "pos" in result.columns and "col" in result.columns
schema = result.schema
assert schema is not None
def test_posexplode_without_alias_empty_array(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "arr": []},
{"id": 2, "arr": [10]},
],
schema="id: int, arr: array<int>",
)
result = df.select("id", F_backend.posexplode("arr"))
schema = result.schema
assert "pos" in result.columns and "col" in result.columns
assert len(schema.fields) == 3
def test_posexplode_without_alias_single_element(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame([{"id": 1, "arr": [42]}])
result = df.select("id", F_backend.posexplode("arr"))
rows = result.collect()
assert len(rows) >= 1
assert "pos" in result.columns and "col" in result.columns
if spark_mode == Mode.PYSPARK and len(rows) == 1:
assert rows[0]["pos"] == 0
assert rows[0]["col"] == 42
def test_posexplode_without_alias_mixed_columns(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"a": "x", "arr": [1, 2], "b": 10},
]
)
result = df.select("a", F_backend.posexplode("arr"), "b")
assert result.columns == ["a", "pos", "col", "b"] or "pos" in result.columns
def test_posexplode_without_alias_column_object(spark, spark_mode):
F_backend = get_imports(spark_mode).F
df = spark.createDataFrame([{"x": [1, 2, 3]}])
result = df.select(F_backend.posexplode(F_backend.col("x")))
assert "pos" in result.columns and "col" in result.columns
schema = result.schema
assert schema is not None