from sparkless.testing import get_imports
def test_between_string_column_numeric_bounds_exact_issue_445(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": "10"},
{"Name": "Bob", "Value": "5"},
]
)
result = df.filter(F.col("Value").between(1, 6))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["Name"] == "Bob"
assert rows[0]["Value"] == "5"
def test_between_string_column_numeric_bounds_both_in_range(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "3"},
{"id": 2, "val": "7"},
]
)
result = df.filter(F.col("val").between(1, 10))
rows = result.collect()
assert len(rows) == 2
def test_between_string_column_numeric_bounds_none_in_range(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "100"},
{"id": 2, "val": "200"},
]
)
result = df.filter(F.col("val").between(1, 10))
rows = result.collect()
assert len(rows) == 0
def test_between_string_column_float_bounds(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "3.5"},
{"id": 2, "val": "7.2"},
]
)
result = df.filter(F.col("val").between(1.0, 5.0))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["val"] == "3.5"
def test_between_string_column_invalid_numeric_returns_null(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "abc"},
{"id": 2, "val": "5"},
]
)
result = df.filter(F.col("val").between(1, 10))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["val"] == "5"
def test_between_integer_column_numeric_bounds_unchanged(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": 5},
{"id": 2, "val": 15},
]
)
result = df.filter(F.col("val").between(1, 10))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["val"] == 5
def test_between_string_column_with_lit_bounds(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "4"},
{"id": 2, "val": "20"},
]
)
result = df.filter(F.col("val").between(F.lit(1), F.lit(10)))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["val"] == "4"
def test_between_string_column_in_select_expression(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "5"},
{"id": 2, "val": "15"},
]
)
result = df.select(
F.col("id"),
F.col("val"),
F.col("val").between(1, 10).alias("in_range"),
)
rows = result.collect()
assert rows[0]["in_range"] is True
assert rows[1]["in_range"] is False
def test_between_string_column_inclusive_boundaries(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "1"},
{"id": 2, "val": "5"},
{"id": 3, "val": "10"},
{"id": 4, "val": "11"},
]
)
result = df.filter(F.col("val").between(1, 10))
rows = result.collect()
assert len(rows) == 3
vals = {r["val"] for r in rows}
assert vals == {"1", "5", "10"}
def test_between_string_column_null_excluded(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "5"},
{"id": 2, "val": None},
]
)
result = df.filter(F.col("val").between(1, 10))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["id"] == 1
def test_between_string_column_negative_numbers(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "-5"},
{"id": 2, "val": "5"},
{"id": 3, "val": "-15"},
]
)
result = df.filter(F.col("val").between(-10, 0))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["val"] == "-5"
def test_between_string_column_then_orderby(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "8"},
{"id": 2, "val": "3"},
{"id": 3, "val": "5"},
]
)
result = df.filter(F.col("val").between(1, 10)).orderBy(F.col("val"))
rows = result.collect()
assert len(rows) == 3
assert [r["val"] for r in rows] == ["3", "5", "8"]
def test_between_string_column_in_when_otherwise(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "3"},
{"id": 2, "val": "7"},
{"id": 3, "val": "15"},
]
)
result = df.withColumn(
"tier",
F.when(F.col("val").between(1, 5), "low")
.when(F.col("val").between(6, 10), "mid")
.otherwise("high"),
)
rows = result.collect()
tier_map = {r["id"]: r["tier"] for r in rows}
assert tier_map[1] == "low"
assert tier_map[2] == "mid"
assert tier_map[3] == "high"
def test_between_string_column_not_between(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "5"},
{"id": 2, "val": "15"},
]
)
result = df.filter(~F.col("val").between(1, 10))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["val"] == "15"
def test_between_string_column_chained_with_select(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "3", "name": "a"},
{"id": 2, "val": "7", "name": "b"},
{"id": 3, "val": "15", "name": "c"},
]
)
result = (
df.filter(F.col("val").between(1, 10))
.select(F.col("id"), F.col("val"))
.filter(F.col("val").between(5, 10))
)
rows = result.collect()
assert len(rows) == 1
assert rows[0]["val"] == "7"
def test_between_string_column_zero_bounds(spark, spark_mode):
F = get_imports(spark_mode).F
df = spark.createDataFrame(
[
{"id": 1, "val": "0"},
{"id": 2, "val": "5"},
{"id": 3, "val": "-1"},
]
)
result = df.filter(F.col("val").between(0, 10))
rows = result.collect()
assert len(rows) == 2
vals = {r["val"] for r in rows}
assert vals == {"0", "5"}