from sparkless.testing import get_imports
imports = get_imports()
SparkSession = imports.SparkSession
StringType = imports.StringType
IntegerType = imports.IntegerType
StructType = imports.StructType
StructField = imports.StructField
F = imports.F
class TestColumnOrderingNulls:
def test_desc_nulls_last_basic(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": "B"},
{"value": None},
{"value": "C"},
{"value": "D"},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == "D"
assert rows[1]["value"] == "C"
assert rows[2]["value"] == "B"
assert rows[3]["value"] == "A"
assert rows[4]["value"] is None
def test_desc_nulls_first(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": "B"},
{"value": None},
{"value": "C"},
{"value": "D"},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_first())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] is None
assert rows[1]["value"] == "D"
assert rows[2]["value"] == "C"
assert rows[3]["value"] == "B"
assert rows[4]["value"] == "A"
def test_asc_nulls_last(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": "B"},
{"value": None},
{"value": "C"},
{"value": "D"},
],
schema=schema,
)
result = df.orderBy(F.col("value").asc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == "A"
assert rows[1]["value"] == "B"
assert rows[2]["value"] == "C"
assert rows[3]["value"] == "D"
assert rows[4]["value"] is None
def test_asc_nulls_first(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": "B"},
{"value": None},
{"value": "C"},
{"value": "D"},
],
schema=schema,
)
result = df.orderBy(F.col("value").asc_nulls_first())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] is None
assert rows[1]["value"] == "A"
assert rows[2]["value"] == "B"
assert rows[3]["value"] == "C"
assert rows[4]["value"] == "D"
def test_desc_nulls_last_integers(self, spark):
schema = StructType([StructField("age", IntegerType(), True)])
df = spark.createDataFrame(
[
{"age": 25},
{"age": 30},
{"age": None},
{"age": 20},
{"age": 35},
],
schema=schema,
)
result = df.orderBy(F.col("age").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["age"] == 35
assert rows[1]["age"] == 30
assert rows[2]["age"] == 25
assert rows[3]["age"] == 20
assert rows[4]["age"] is None
def test_desc_nulls_last_all_nulls(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": None},
{"value": None},
{"value": None},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 3
assert all(row["value"] is None for row in rows)
def test_desc_nulls_last_no_nulls(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": "B"},
{"value": "C"},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 3
assert rows[0]["value"] == "C"
assert rows[1]["value"] == "B"
assert rows[2]["value"] == "A"
def test_desc_nulls_last_multiple_nulls(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": None},
{"value": "B"},
{"value": None},
{"value": "C"},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == "C"
assert rows[1]["value"] == "B"
assert rows[2]["value"] == "A"
assert rows[3]["value"] is None
assert rows[4]["value"] is None
def test_asc_nulls_first_multiple_nulls(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": None},
{"value": "B"},
{"value": None},
{"value": "C"},
],
schema=schema,
)
result = df.orderBy(F.col("value").asc_nulls_first())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] is None
assert rows[1]["value"] is None
assert rows[2]["value"] == "A"
assert rows[3]["value"] == "B"
assert rows[4]["value"] == "C"
def test_desc_nulls_last_with_sort_method(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": "B"},
{"value": None},
{"value": "C"},
{"value": "D"},
],
schema=schema,
)
result = df.sort(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == "D"
assert rows[1]["value"] == "C"
assert rows[2]["value"] == "B"
assert rows[3]["value"] == "A"
assert rows[4]["value"] is None
def test_column_methods_exist(self, spark):
col = F.col("test")
assert hasattr(col, "desc_nulls_last")
assert hasattr(col, "desc_nulls_first")
assert hasattr(col, "asc_nulls_last")
assert hasattr(col, "asc_nulls_first")
result1 = col.desc_nulls_last()
result2 = col.desc_nulls_first()
result3 = col.asc_nulls_last()
result4 = col.asc_nulls_first()
assert result1 is not None
assert result2 is not None
assert result3 is not None
assert result4 is not None
def test_multiple_columns_ordering(self, spark):
schema = StructType(
[
StructField("category", StringType(), True),
StructField("value", IntegerType(), True),
]
)
df = spark.createDataFrame(
[
{"category": "A", "value": 10},
{"category": "A", "value": None},
{"category": None, "value": 20},
{"category": "B", "value": 15},
{"category": None, "value": 5},
],
schema=schema,
)
result = df.orderBy(
F.col("category").asc_nulls_last(), F.col("value").desc_nulls_first()
)
rows = result.collect()
assert len(rows) == 5
assert rows[0]["category"] == "A"
assert rows[1]["category"] == "A"
assert rows[2]["category"] == "B"
assert rows[3]["category"] is None
assert rows[4]["category"] is None
def test_desc_nulls_last_float(self, spark):
df = spark.createDataFrame(
[
{"score": 3.14},
{"score": 2.71},
{"score": None},
{"score": 1.41},
{"score": 4.67},
],
)
result = df.orderBy(F.col("score").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert abs(rows[0]["score"] - 4.67) < 0.01
assert abs(rows[1]["score"] - 3.14) < 0.01
assert abs(rows[2]["score"] - 2.71) < 0.01
assert abs(rows[3]["score"] - 1.41) < 0.01
assert rows[4]["score"] is None
def test_desc_nulls_last_negative_numbers(self, spark):
schema = StructType([StructField("temp", IntegerType(), True)])
df = spark.createDataFrame(
[
{"temp": -5},
{"temp": 10},
{"temp": None},
{"temp": -10},
{"temp": 0},
],
schema=schema,
)
result = df.orderBy(F.col("temp").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["temp"] == 10
assert rows[1]["temp"] == 0
assert rows[2]["temp"] == -5
assert rows[3]["temp"] == -10
assert rows[4]["temp"] is None
def test_asc_nulls_last_empty_dataframe(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame([], schema=schema)
result = df.orderBy(F.col("value").asc_nulls_last())
rows = result.collect()
assert len(rows) == 0
def test_desc_nulls_last_single_row(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame([{"value": "A"}], schema=schema)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 1
assert rows[0]["value"] == "A"
def test_desc_nulls_last_single_null_row(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame([{"value": None}], schema=schema)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 1
assert rows[0]["value"] is None
def test_nulls_at_beginning_middle_end(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": None}, {"value": "B"},
{"value": None}, {"value": "A"},
{"value": None}, ],
schema=schema,
)
result1 = df.orderBy(F.col("value").desc_nulls_last())
rows1 = result1.collect()
assert len(rows1) == 5
assert rows1[0]["value"] == "B"
assert rows1[1]["value"] == "A"
assert rows1[2]["value"] is None
assert rows1[3]["value"] is None
assert rows1[4]["value"] is None
result2 = df.orderBy(F.col("value").desc_nulls_first())
rows2 = result2.collect()
assert len(rows2) == 5
assert rows2[0]["value"] is None
assert rows2[1]["value"] is None
assert rows2[2]["value"] is None
assert rows2[3]["value"] == "B"
assert rows2[4]["value"] == "A"
def test_unicode_characters(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "世界"},
{"value": None},
{"value": "Hello"},
{"value": "🌍"},
],
schema=schema,
)
result = df.orderBy(F.col("value").asc_nulls_last())
rows = result.collect()
assert len(rows) == 4
assert rows[3]["value"] is None
def test_mixed_asc_desc_nulls_variants(self, spark):
schema = StructType(
[
StructField("col1", StringType(), True),
StructField("col2", IntegerType(), True),
]
)
df = spark.createDataFrame(
[
{"col1": "A", "col2": 1},
{"col1": "A", "col2": None},
{"col1": None, "col2": 2},
{"col1": "B", "col2": 1},
{"col1": "B", "col2": None},
],
schema=schema,
)
result = df.orderBy(
F.col("col1").asc_nulls_last(), F.col("col2").desc_nulls_first()
)
rows = result.collect()
assert len(rows) == 5
def test_very_large_numbers(self, spark):
schema = StructType([StructField("value", IntegerType(), True)])
df = spark.createDataFrame(
[
{"value": 999999999},
{"value": None},
{"value": -999999999},
{"value": 0},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 4
assert rows[0]["value"] == 999999999
assert rows[1]["value"] == 0
assert rows[2]["value"] == -999999999
assert rows[3]["value"] is None
def test_all_four_methods_comprehensive(self, spark):
test_data = [
{"value": "Z"},
{"value": "A"},
{"value": None},
{"value": "M"},
]
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(test_data, schema=schema)
result1 = df.orderBy(F.col("value").desc_nulls_last())
rows1 = result1.collect()
assert rows1[0]["value"] == "Z"
assert rows1[1]["value"] == "M"
assert rows1[2]["value"] == "A"
assert rows1[3]["value"] is None
result2 = df.orderBy(F.col("value").desc_nulls_first())
rows2 = result2.collect()
assert rows2[0]["value"] is None
assert rows2[1]["value"] == "Z"
assert rows2[2]["value"] == "M"
assert rows2[3]["value"] == "A"
result3 = df.orderBy(F.col("value").asc_nulls_last())
rows3 = result3.collect()
assert rows3[0]["value"] == "A"
assert rows3[1]["value"] == "M"
assert rows3[2]["value"] == "Z"
assert rows3[3]["value"] is None
result4 = df.orderBy(F.col("value").asc_nulls_first())
rows4 = result4.collect()
assert rows4[0]["value"] is None
assert rows4[1]["value"] == "A"
assert rows4[2]["value"] == "M"
assert rows4[3]["value"] == "Z"
def test_comparison_with_default_desc_asc(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "B"},
{"value": None},
{"value": "A"},
],
schema=schema,
)
result_desc = df.orderBy(F.col("value").desc())
rows_desc = result_desc.collect()
result_desc_nulls_last = df.orderBy(F.col("value").desc_nulls_last())
rows_desc_nulls_last = result_desc_nulls_last.collect()
assert rows_desc[0]["value"] == rows_desc_nulls_last[0]["value"]
assert rows_desc[1]["value"] == rows_desc_nulls_last[1]["value"]
def test_three_column_ordering(self, spark):
schema = StructType(
[
StructField("col1", StringType(), True),
StructField("col2", IntegerType(), True),
StructField("col3", StringType(), True),
]
)
df = spark.createDataFrame(
[
{"col1": "A", "col2": 1, "col3": "X"},
{"col1": "A", "col2": None, "col3": "Y"},
{"col1": None, "col2": 2, "col3": "Z"},
{"col1": "B", "col2": 1, "col3": None},
],
schema=schema,
)
result = df.orderBy(
F.col("col1").asc_nulls_last(),
F.col("col2").desc_nulls_first(),
F.col("col3").asc_nulls_last(),
)
rows = result.collect()
assert len(rows) == 4
assert rows[0]["col1"] == "A"
assert rows[1]["col1"] == "A"
assert rows[2]["col1"] == "B"
assert rows[3]["col1"] is None
def test_string_comparison_edge_cases(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": ""}, {"value": None},
{"value": " "}, {"value": "a"},
{"value": "A"}, ],
schema=schema,
)
result = df.orderBy(F.col("value").asc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[-1]["value"] is None
def test_complex_ordering_scenario(self, spark):
schema = StructType(
[
StructField("dept", StringType(), True),
StructField("name", StringType(), True),
StructField("salary", IntegerType(), True),
]
)
df = spark.createDataFrame(
[
{"dept": "IT", "name": "Alice", "salary": 5000},
{"dept": "IT", "name": None, "salary": 6000},
{"dept": None, "name": "Bob", "salary": 4000},
{"dept": "HR", "name": "Charlie", "salary": None},
{"dept": "HR", "name": "David", "salary": 5500},
{"dept": None, "name": None, "salary": None},
],
schema=schema,
)
result = df.orderBy(
F.col("dept").asc_nulls_last(),
F.col("salary").desc_nulls_first(),
F.col("name").asc_nulls_last(),
)
rows = result.collect()
assert len(rows) == 6
assert rows[0]["dept"] == "HR"
assert rows[1]["dept"] == "HR"
assert rows[2]["dept"] == "IT"
assert rows[3]["dept"] == "IT"
assert rows[4]["dept"] is None
assert rows[5]["dept"] is None
def test_ordering_with_duplicate_values(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "A"},
{"value": "A"}, {"value": None},
{"value": "B"},
{"value": "A"}, ],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == "B"
assert rows[1]["value"] == "A"
assert rows[2]["value"] == "A"
assert rows[3]["value"] == "A"
assert rows[4]["value"] is None
def test_ordering_with_identical_nulls(self, spark):
schema = StructType(
[
StructField("col1", StringType(), True),
StructField("col2", IntegerType(), True),
]
)
df = spark.createDataFrame(
[
{"col1": None, "col2": 1},
{"col1": None, "col2": 2},
{"col1": None, "col2": None},
],
schema=schema,
)
result = df.orderBy(
F.col("col1").asc_nulls_last(), F.col("col2").desc_nulls_first()
)
rows = result.collect()
assert len(rows) == 3
assert rows[0]["col1"] is None
assert rows[1]["col1"] is None
assert rows[2]["col1"] is None
assert rows[0]["col2"] is None
def test_mixed_data_types_ordering(self, spark):
schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField(
"score", StringType(), True
), StructField(
"active", StringType(), True
), ]
)
df = spark.createDataFrame(
[
{"name": "Alice", "age": 25, "score": "85.5", "active": "true"},
{"name": None, "age": 30, "score": None, "active": None},
{"name": "Bob", "age": None, "score": "90.0", "active": "false"},
],
schema=schema,
)
result = df.orderBy(
F.col("name").asc_nulls_last(),
F.col("age").desc_nulls_first(),
F.col("score").asc_nulls_last(),
F.col("active").desc_nulls_first(),
)
rows = result.collect()
assert len(rows) == 3
assert rows[0]["name"] == "Alice"
assert rows[1]["name"] == "Bob"
assert rows[2]["name"] is None
class TestColumnOrderingParity:
def test_pyspark_desc_nulls_last_parity(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "Z"},
{"value": "A"},
{"value": None},
{"value": "M"},
{"value": None},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == "Z"
assert rows[1]["value"] == "M"
assert rows[2]["value"] == "A"
assert rows[3]["value"] is None
assert rows[4]["value"] is None
non_null_indices = [i for i, row in enumerate(rows) if row["value"] is not None]
null_indices = [i for i, row in enumerate(rows) if row["value"] is None]
if non_null_indices and null_indices:
assert max(non_null_indices) < min(null_indices)
def test_pyspark_desc_nulls_first_parity(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "Z"},
{"value": "A"},
{"value": None},
{"value": "M"},
{"value": None},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_first())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] is None
assert rows[1]["value"] is None
assert rows[2]["value"] == "Z"
assert rows[3]["value"] == "M"
assert rows[4]["value"] == "A"
null_indices = [i for i, row in enumerate(rows) if row["value"] is None]
non_null_indices = [i for i, row in enumerate(rows) if row["value"] is not None]
if null_indices and non_null_indices:
assert max(null_indices) < min(non_null_indices)
def test_pyspark_asc_nulls_last_parity(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "Z"},
{"value": "A"},
{"value": None},
{"value": "M"},
{"value": None},
],
schema=schema,
)
result = df.orderBy(F.col("value").asc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == "A"
assert rows[1]["value"] == "M"
assert rows[2]["value"] == "Z"
assert rows[3]["value"] is None
assert rows[4]["value"] is None
def test_pyspark_asc_nulls_first_parity(self, spark):
schema = StructType([StructField("value", StringType(), True)])
df = spark.createDataFrame(
[
{"value": "Z"},
{"value": "A"},
{"value": None},
{"value": "M"},
{"value": None},
],
schema=schema,
)
result = df.orderBy(F.col("value").asc_nulls_first())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] is None
assert rows[1]["value"] is None
assert rows[2]["value"] == "A"
assert rows[3]["value"] == "M"
assert rows[4]["value"] == "Z"
def test_pyspark_multi_column_parity(self, spark):
schema = StructType(
[
StructField("col1", StringType(), True),
StructField("col2", IntegerType(), True),
]
)
df = spark.createDataFrame(
[
{"col1": "A", "col2": 2},
{"col1": "A", "col2": None},
{"col1": None, "col2": 1},
{"col1": "B", "col2": 3},
{"col1": None, "col2": None},
],
schema=schema,
)
result = df.orderBy(
F.col("col1").asc_nulls_last(), F.col("col2").desc_nulls_first()
)
rows = result.collect()
assert len(rows) == 5
assert rows[0]["col1"] == "A"
assert rows[1]["col1"] == "A"
assert rows[0]["col2"] is None
assert rows[1]["col2"] == 2
assert rows[2]["col1"] == "B"
assert rows[3]["col1"] is None
assert rows[4]["col1"] is None
def test_pyspark_integer_ordering_parity(self, spark):
schema = StructType([StructField("value", IntegerType(), True)])
df = spark.createDataFrame(
[
{"value": 10},
{"value": -5},
{"value": None},
{"value": 0},
{"value": -10},
],
schema=schema,
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert rows[0]["value"] == 10
assert rows[1]["value"] == 0
assert rows[2]["value"] == -5
assert rows[3]["value"] == -10
assert rows[4]["value"] is None
def test_pyspark_float_ordering_parity(self, spark):
df = spark.createDataFrame(
[
{"value": 1.5},
{"value": -2.5},
{"value": None},
{"value": 0.0},
{"value": 3.14},
],
)
result = df.orderBy(F.col("value").desc_nulls_last())
rows = result.collect()
assert len(rows) == 5
assert abs(rows[0]["value"] - 3.14) < 0.01
assert abs(rows[1]["value"] - 1.5) < 0.01
assert abs(rows[2]["value"] - 0.0) < 0.01
assert abs(rows[3]["value"] - (-2.5)) < 0.01
assert rows[4]["value"] is None