from __future__ import annotations
import robin_sparkless as rs
def test_split_with_limit_two_parts() -> None:
spark = rs.SparkSession.builder().app_name("split_limit").get_or_create()
create_df = getattr(spark, "create_dataframe_from_rows", None) or getattr(
spark, "_create_dataframe_from_rows"
)
df = create_df([{"s": "a,b,c"}], [("s", "string")])
out = df.select(rs.split(rs.col("s"), ",", 2)).collect()
assert len(out) == 1
row = out[0]
parts = [v for v in row.values() if isinstance(v, list)]
assert len(parts) == 1
assert parts[0] == ["a", "b,c"]
def test_split_without_limit_unchanged() -> None:
spark = rs.SparkSession.builder().app_name("split_limit").get_or_create()
create_df = getattr(spark, "create_dataframe_from_rows", None) or getattr(
spark, "_create_dataframe_from_rows"
)
df = create_df([{"s": "a,b,c"}], [("s", "string")])
out = df.select(rs.split(rs.col("s"), ",")).collect()
assert len(out) == 1
parts = [v for v in out[0].values() if isinstance(v, list)]
assert len(parts) == 1
assert parts[0] == ["a", "b", "c"]
def test_column_split_with_limit() -> None:
spark = rs.SparkSession.builder().app_name("split_limit").get_or_create()
create_df = getattr(spark, "create_dataframe_from_rows", None) or getattr(
spark, "_create_dataframe_from_rows"
)
df = create_df([{"s": "a,b,c"}], [("s", "string")])
out = df.select(rs.col("s").split(",", 2)).collect()
assert len(out) == 1
parts = [v for v in out[0].values() if isinstance(v, list)]
assert len(parts) == 1
assert parts[0] == ["a", "b,c"]