import os
import pytest
from sparkless.testing import get_imports
_imports = get_imports()
SparkSession = _imports.SparkSession
F = _imports.F
StructType = _imports.StructType
StructField = _imports.StructField
StringType = _imports.StringType
IntegerType = _imports.IntegerType
LongType = _imports.LongType
DoubleType = _imports.DoubleType
class TestIssue295WithColumnRenamedNonexistent:
def test_withColumnRenamed_nonexistent_column_no_op(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.withColumnRenamed("Does-Not-Exist", "Still-Does-Not-Exist")
assert result.count() == 2
assert set(result.columns) == {"Name", "Value"}
assert "Does-Not-Exist" not in result.columns
assert "Still-Does-Not-Exist" not in result.columns
rows = result.collect()
assert len(rows) == 2
assert rows[0]["Name"] == "Alice"
assert rows[0]["Value"] == 1
assert rows[1]["Name"] == "Bob"
assert rows[1]["Value"] == 2
assert df.count() == 2
assert set(df.columns) == {"Name", "Value"}
finally:
spark.stop()
def test_withColumnRenamed_existing_column_works(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.withColumnRenamed("Name", "FullName")
assert result.count() == 2
assert "FullName" in result.columns
assert "Name" not in result.columns
assert "Value" in result.columns
rows = result.collect()
assert rows[0]["FullName"] == "Alice"
assert rows[0]["Value"] == 1
assert rows[1]["FullName"] == "Bob"
assert rows[1]["Value"] == 2
finally:
spark.stop()
def test_withColumnRenamed_case_insensitive_nonexistent(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.withColumnRenamed("DOES-NOT-EXIST", "new_name")
assert result.count() == 2
assert set(result.columns) == {"Name", "Value"}
assert "new_name" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_chained_with_nonexistent(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.withColumnRenamed("Name", "FullName").withColumnRenamed(
"Does-Not-Exist", "Still-Does-Not-Exist"
)
assert result.count() == 2
assert "FullName" in result.columns
assert "Name" not in result.columns
assert "Value" in result.columns
assert "Does-Not-Exist" not in result.columns
assert "Still-Does-Not-Exist" not in result.columns
rows = result.collect()
assert rows[0]["FullName"] == "Alice"
assert rows[0]["Value"] == 1
finally:
spark.stop()
def test_withColumnsRenamed_with_nonexistent_columns(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.withColumnsRenamed(
{
"Name": "FullName", "Does-Not-Exist": "Still-Does-Not-Exist", }
)
assert result.count() == 2
assert "FullName" in result.columns
assert "Name" not in result.columns
assert "Value" in result.columns
assert "Does-Not-Exist" not in result.columns
assert "Still-Does-Not-Exist" not in result.columns
rows = result.collect()
assert rows[0]["FullName"] == "Alice"
assert rows[0]["Value"] == 1
finally:
spark.stop()
def test_withColumnsRenamed_all_nonexistent_no_op(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.withColumnsRenamed(
{
"Does-Not-Exist-1": "New-Name-1",
"Does-Not-Exist-2": "New-Name-2",
}
)
assert result.count() == 2
assert set(result.columns) == {"Name", "Value"}
assert "New-Name-1" not in result.columns
assert "New-Name-2" not in result.columns
rows = result.collect()
assert rows[0]["Name"] == "Alice"
assert rows[0]["Value"] == 1
finally:
spark.stop()
@pytest.mark.skipif(
(
os.environ.get("SPARKLESS_TEST_MODE")
or os.environ.get("SPARKLESS_TEST_MODE")
or ""
)
.strip()
.lower()
== "pyspark",
reason="Skipped in PySpark mode (driver/worker Python version mismatch with pytest-xdist)",
)
def test_withColumnRenamed_after_operations(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.filter(df.Value > 1).withColumnRenamed(
"Does-Not-Exist", "Still-Does-Not-Exist"
)
assert result.count() == 1
assert set(result.columns) == {"Name", "Value"}
assert "Does-Not-Exist" not in result.columns
rows = result.collect()
assert rows[0]["Name"] == "Bob"
assert rows[0]["Value"] == 2
finally:
spark.stop()
def test_withColumnRenamed_empty_dataframe(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
schema = StructType(
[
StructField("Name", StringType(), True),
StructField("Value", IntegerType(), True),
]
)
df = spark.createDataFrame([], schema)
result = df.withColumnRenamed("Does-Not-Exist", "New-Name")
assert result.count() == 0
assert set(result.columns) == {"Name", "Value"}
assert "New-Name" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_with_null_values(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": None},
{"Name": None, "Value": 2},
{"Name": "Charlie", "Value": 3},
]
)
result = df.withColumnRenamed("Does-Not-Exist", "New-Name")
assert result.count() == 3
assert set(result.columns) == {"Name", "Value"}
assert "New-Name" not in result.columns
rows = result.collect()
assert rows[0]["Value"] is None
assert rows[1]["Name"] is None
finally:
spark.stop()
def test_withColumnRenamed_different_data_types(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
from datetime import date, datetime
df = spark.createDataFrame(
[
{
"name": "Alice",
"age": 25,
"salary": 50000.5,
"active": True,
"birth_date": date(1998, 1, 15),
"created_at": datetime(2023, 1, 1, 12, 0, 0),
},
{
"name": "Bob",
"age": 30,
"salary": 60000.0,
"active": False,
"birth_date": date(1993, 5, 20),
"created_at": datetime(2023, 2, 1, 14, 30, 0),
},
]
)
result = df.withColumnRenamed("Does-Not-Exist", "New-Name")
assert result.count() == 2
assert len(result.columns) == 6
assert "Does-Not-Exist" not in result.columns
assert "New-Name" not in result.columns
rows = result.collect()
assert isinstance(rows[0]["age"], int)
assert isinstance(rows[0]["salary"], float)
assert isinstance(rows[0]["active"], bool)
finally:
spark.stop()
def test_withColumnRenamed_special_characters_in_names(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"col_with_underscore": 1, "col-with-dash": 2, "col.with.dot": 3},
{"col_with_underscore": 4, "col-with-dash": 5, "col.with.dot": 6},
]
)
result = df.withColumnRenamed("col@with#special$chars", "new@col#name")
assert result.count() == 2
assert "col_with_underscore" in result.columns
assert "col-with-dash" in result.columns
assert "col.with.dot" in result.columns
assert "col@with#special$chars" not in result.columns
assert "new@col#name" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_unicode_column_names(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"姓名": "Alice", "年龄": 25, "値": 100},
{"姓名": "Bob", "年龄": 30, "値": 200},
]
)
result = df.withColumnRenamed("不存在", "新列名")
assert result.count() == 2
assert "姓名" in result.columns
assert "年龄" in result.columns
assert "値" in result.columns
assert "不存在" not in result.columns
assert "新列名" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_very_long_column_name(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
long_col_name = "a" * 1000 df = spark.createDataFrame([{long_col_name: 1, "short": 2}])
result = df.withColumnRenamed("b" * 1000, "c" * 1000)
assert result.count() == 1
assert long_col_name in result.columns
assert "short" in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_join(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df1 = spark.createDataFrame(
[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
)
df2 = spark.createDataFrame(
[{"id": 1, "value": 100}, {"id": 2, "value": 200}]
)
result = df1.join(df2, on="id", how="inner").withColumnRenamed(
"Does-Not-Exist", "Still-Does-Not-Exist"
)
assert result.count() == 2
assert "id" in result.columns
assert "name" in result.columns
assert "value" in result.columns
assert "Does-Not-Exist" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_groupby(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"dept": "IT", "salary": 50000},
{"dept": "IT", "salary": 60000},
{"dept": "HR", "salary": 55000},
]
)
result = (
df.groupBy("dept")
.agg(F.avg("salary").alias("avg_salary"))
.withColumnRenamed("Does-Not-Exist", "New-Name")
)
assert result.count() == 2
assert "dept" in result.columns
assert "avg_salary" in result.columns
assert "Does-Not-Exist" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_select(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"name": "Alice", "age": 25, "salary": 50000},
{"name": "Bob", "age": 30, "salary": 60000},
]
)
result = df.select("name", "age").withColumnRenamed(
"Does-Not-Exist", "New-Name"
)
assert result.count() == 2
assert "name" in result.columns
assert "age" in result.columns
assert "salary" not in result.columns
assert "Does-Not-Exist" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_orderby(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 20},
]
)
result = df.orderBy("age").withColumnRenamed("Does-Not-Exist", "New-Name")
assert result.count() == 3
rows = result.collect()
assert rows[0]["age"] == 20 assert rows[1]["age"] == 25
assert rows[2]["age"] == 30
finally:
spark.stop()
def test_withColumnRenamed_multiple_chained_nonexistent(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame([{"name": "Alice", "age": 25}])
result = (
df.withColumnRenamed("Does-Not-Exist-1", "New-Name-1")
.withColumnRenamed("Does-Not-Exist-2", "New-Name-2")
.withColumnRenamed("Does-Not-Exist-3", "New-Name-3")
)
assert result.count() == 1
assert set(result.columns) == {"name", "age"}
assert "New-Name-1" not in result.columns
assert "New-Name-2" not in result.columns
assert "New-Name-3" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_mixed_existing_and_nonexistent_chained(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame([{"name": "Alice", "age": 25, "city": "NYC"}])
result = (
df.withColumnRenamed("name", "full_name") .withColumnRenamed("Does-Not-Exist-1", "New-1") .withColumnRenamed("age", "years") .withColumnRenamed("Does-Not-Exist-2", "New-2") )
assert result.count() == 1
assert "full_name" in result.columns
assert "years" in result.columns
assert "city" in result.columns
assert "name" not in result.columns
assert "age" not in result.columns
assert "New-1" not in result.columns
assert "New-2" not in result.columns
finally:
spark.stop()
def test_withColumnsRenamed_mixed_existing_nonexistent_complex(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"a": 1, "b": 2, "c": 3, "d": 4},
{"a": 5, "b": 6, "c": 7, "d": 8},
]
)
result = df.withColumnsRenamed(
{
"a": "A", "Does-Not-Exist-1": "New-1", "b": "B", "Does-Not-Exist-2": "New-2", "Does-Not-Exist-3": "New-3", }
)
assert result.count() == 2
assert "A" in result.columns
assert "B" in result.columns
assert "c" in result.columns
assert "d" in result.columns
assert "a" not in result.columns
assert "b" not in result.columns
assert "New-1" not in result.columns
assert "New-2" not in result.columns
assert "New-3" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_union(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df1 = spark.createDataFrame([{"id": 1, "name": "Alice"}])
df2 = spark.createDataFrame([{"id": 2, "name": "Bob"}])
result = df1.union(df2).withColumnRenamed("Does-Not-Exist", "New-Name")
assert result.count() == 2
assert "id" in result.columns
assert "name" in result.columns
assert "Does-Not-Exist" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_distinct(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"name": "Alice", "dept": "IT"},
{"name": "Alice", "dept": "IT"},
{"name": "Bob", "dept": "HR"},
]
)
result = df.distinct().withColumnRenamed("Does-Not-Exist", "New-Name")
assert result.count() == 2
assert "name" in result.columns
assert "dept" in result.columns
assert "Does-Not-Exist" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_withColumn(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame([{"name": "Alice", "age": 25}])
result = df.withColumn("double_age", F.col("age") * 2).withColumnRenamed(
"Does-Not-Exist", "New-Name"
)
assert result.count() == 1
assert "name" in result.columns
assert "age" in result.columns
assert "double_age" in result.columns
assert "Does-Not-Exist" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_after_drop(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame([{"name": "Alice", "age": 25, "city": "NYC"}])
result = df.drop("city").withColumnRenamed("Does-Not-Exist", "New-Name")
assert result.count() == 1
assert "name" in result.columns
assert "age" in result.columns
assert "city" not in result.columns
assert "Does-Not-Exist" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_whitespace_in_column_names(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame([{"name": "Alice", "age": 25}])
result = df.withColumnRenamed("Does Not Exist", "New Name")
assert result.count() == 1
assert "name" in result.columns
assert "age" in result.columns
assert "Does Not Exist" not in result.columns
assert "New Name" not in result.columns
finally:
spark.stop()
def test_withColumnRenamed_complex_nested_operations(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame(
[
{"name": "Alice", "age": 25, "salary": 50000},
{"name": "Bob", "age": 30, "salary": 60000},
{"name": "Charlie", "age": 35, "salary": 70000},
]
)
result = (
df.filter(F.col("age") > 25)
.select("name", "age", "salary")
.withColumn("bonus", F.col("salary") * 0.1)
.orderBy(F.desc("salary"))
.withColumnRenamed("Does-Not-Exist", "New-Name")
)
assert result.count() == 2
assert "name" in result.columns
assert "age" in result.columns
assert "salary" in result.columns
assert "bonus" in result.columns
assert "Does-Not-Exist" not in result.columns
rows = result.collect()
assert rows[0]["salary"] == 70000
assert rows[1]["salary"] == 60000
finally:
spark.stop()
def test_withColumnRenamed_idempotent_behavior(self):
spark = SparkSession.builder.appName("issue-295").getOrCreate()
try:
df = spark.createDataFrame([{"name": "Alice", "age": 25}])
result1 = df.withColumnRenamed("Does-Not-Exist", "New-Name")
result2 = result1.withColumnRenamed("Does-Not-Exist", "New-Name")
result3 = result2.withColumnRenamed("Does-Not-Exist", "New-Name")
assert result1.count() == result2.count() == result3.count() == 1
assert set(result1.columns) == set(result2.columns) == set(result3.columns)
assert "Does-Not-Exist" not in result3.columns
assert "New-Name" not in result3.columns
finally:
spark.stop()