import os
import pytest
from tests.fixtures.spark_imports import get_spark_imports
_imports = get_spark_imports()
SparkSession = _imports.SparkSession
class TestIssue287NAReplace:
def test_na_replace_with_dict_and_subset(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
map_value = {"A": "TypeA", "B": "TypeB"}
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A"},
{"Name": "Bob", "Type": "B"},
]
)
result = df.na.replace(map_value, subset=["Type"])
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "TypeB"
assert alice_row["Name"] == "Alice"
assert bob_row["Name"] == "Bob"
finally:
spark.stop()
def test_na_replace_with_dict_no_subset(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
map_value = {"A": "TypeA", "B": "TypeB"}
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A"},
{"Name": "Bob", "Type": "B"},
]
)
result = df.na.replace(map_value)
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "TypeB"
finally:
spark.stop()
def test_na_replace_single_value(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 1},
{"Name": "Charlie", "Value": 2},
]
)
result = df.na.replace(1, 99, subset=["Value"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 99
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Value"] == 99
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Value"] == 2
finally:
spark.stop()
def test_na_replace_list_with_single_value(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
{"Name": "Charlie", "Value": 3},
]
)
result = df.na.replace([1, 2], 99, subset=["Value"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 99
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Value"] == 99
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Value"] == 3
finally:
spark.stop()
def test_na_replace_list_with_list(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
{"Name": "Charlie", "Value": 3},
]
)
result = df.na.replace([1, 2], [10, 20], subset=["Value"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 10
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Value"] == 20
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Value"] == 3
finally:
spark.stop()
def test_na_replace_with_string_subset(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
map_value = {"A": "TypeA", "B": "TypeB"}
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A"},
{"Name": "Bob", "Type": "B"},
]
)
result = df.na.replace(map_value, subset="Type")
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "TypeB"
finally:
spark.stop()
def test_na_replace_with_tuple_subset(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
map_value = {"A": "TypeA", "B": "TypeB"}
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A", "Category": "A"},
{"Name": "Bob", "Type": "B", "Category": "B"},
]
)
result = df.na.replace(map_value, subset=("Type", "Category"))
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
assert alice_row["Category"] == "TypeA"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "TypeB"
assert bob_row["Category"] == "TypeB"
finally:
spark.stop()
def test_na_replace_multiple_columns(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A", "Status": "A"},
{"Name": "Bob", "Type": "B", "Status": "B"},
]
)
result = df.na.replace(
{"A": "TypeA", "B": "TypeB"}, subset=["Type", "Status"]
)
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
assert alice_row["Status"] == "TypeA"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "TypeB"
assert bob_row["Status"] == "TypeB"
finally:
spark.stop()
def test_na_replace_with_numeric_values(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Score": 1.0},
{"Name": "Bob", "Score": 2.0},
{"Name": "Charlie", "Score": 3.0},
]
)
result = df.na.replace({1.0: 10.0, 2.0: 20.0}, subset=["Score"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Score"] == 10.0
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Score"] == 20.0
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Score"] == 3.0
finally:
spark.stop()
def test_na_replace_no_matches(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A"},
{"Name": "Bob", "Type": "B"},
]
)
result = df.na.replace({"X": "TypeX", "Y": "TypeY"}, subset=["Type"])
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "A"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "B"
finally:
spark.stop()
def test_na_replace_partial_matches(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A"},
{"Name": "Bob", "Type": "B"},
{"Name": "Charlie", "Type": "C"},
]
)
result = df.na.replace({"A": "TypeA", "B": "TypeB"}, subset=["Type"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "TypeB"
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Type"] == "C"
finally:
spark.stop()
def test_na_replace_empty_dataframe(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame([], schema="Name string, Type string")
result = df.na.replace({"A": "TypeA"}, subset=["Type"])
rows = result.collect()
assert len(rows) == 0
finally:
spark.stop()
def test_na_replace_chained_operations(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A", "Value": 1},
{"Name": "Bob", "Type": "B", "Value": 2},
]
)
result = df.na.replace(
{"A": "TypeA", "B": "TypeB"}, subset=["Type"]
).filter("Type = 'TypeA'")
rows = result.collect()
assert len(rows) == 1
alice_row = rows[0]
assert alice_row["Name"] == "Alice"
assert alice_row["Type"] == "TypeA"
finally:
spark.stop()
def test_na_replace_with_none_values(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": None},
{"Name": "Bob", "Value": 1},
{"Name": "Charlie", "Value": None},
]
)
with pytest.raises(Exception):
df.na.replace(None, 0, subset=["Value"]).collect()
finally:
spark.stop()
@pytest.mark.skipif(
(
os.environ.get("SPARKLESS_TEST_BACKEND")
or os.environ.get("MOCK_SPARK_TEST_BACKEND")
or ""
)
.strip()
.lower()
== "pyspark",
reason="Skipped in PySpark mode (driver/worker Python version mismatch with pytest-xdist)",
)
def test_na_replace_with_none_as_replacement(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
{"Name": "Charlie", "Value": 3},
]
)
result = df.na.replace({2: None}, subset=["Value"])
rows = result.collect()
assert len(rows) == 3
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Value"] is None
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 1
finally:
spark.stop()
def test_na_replace_with_boolean_values(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Active": True},
{"Name": "Bob", "Active": False},
{"Name": "Charlie", "Active": True},
]
)
result = df.na.replace(True, False, subset=["Active"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Active"] is False
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Active"] is False
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Active"] is False
finally:
spark.stop()
def test_na_replace_with_type_coercion(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": "1"},
{"Name": "Bob", "Value": "2"},
{"Name": "Charlie", "Value": "3"},
]
)
result = df.na.replace("1", "10", subset=["Value"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == "10"
finally:
spark.stop()
def test_na_replace_with_special_characters(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Text": "Hello, World!"},
{"Name": "Bob", "Text": "Test@123"},
{"Name": "Charlie", "Text": "Hello, World!"},
]
)
result = df.na.replace("Hello, World!", "Hi", subset=["Text"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Text"] == "Hi"
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Text"] == "Hi"
finally:
spark.stop()
def test_na_replace_with_unicode(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Text": "Hello 🌍"},
{"Name": "Bob", "Text": "Test"},
{"Name": "Charlie", "Text": "Hello 🌍"},
]
)
result = df.na.replace("Hello 🌍", "Hi World", subset=["Text"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Text"] == "Hi World"
finally:
spark.stop()
def test_na_replace_with_zero_and_negative(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 0},
{"Name": "Bob", "Value": -1},
{"Name": "Charlie", "Value": 5},
]
)
result = df.na.replace({0: 100, -1: 200}, subset=["Value"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 100
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Value"] == 200
charlie_row = next((r for r in rows if r["Name"] == "Charlie"), None)
assert charlie_row is not None
assert charlie_row["Value"] == 5
finally:
spark.stop()
def test_na_replace_with_empty_dict(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.na.replace({}, subset=["Value"])
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 1
finally:
spark.stop()
def test_na_replace_with_empty_list(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.na.replace([], 99, subset=["Value"])
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 1
finally:
spark.stop()
def test_na_replace_invalid_subset_column(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
]
)
with pytest.raises(Exception):
df.na.replace(1, 99, subset=["NonExistentColumn"]).collect()
finally:
spark.stop()
def test_na_replace_mismatched_list_lengths(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
]
)
with pytest.raises(Exception):
df.na.replace([1, 2], [10], subset=["Value"]).collect()
finally:
spark.stop()
def test_na_replace_none_value_with_scalar(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
{"Name": "Charlie", "Value": 3},
]
)
result = df.na.replace({2: None}, subset=["Value"])
rows = result.collect()
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Value"] is None
finally:
spark.stop()
def test_na_replace_none_value_with_list(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
{"Name": "Charlie", "Value": 3},
]
)
try:
result = df.na.replace([1, 2], None, subset=["Value"])
rows = result.collect()
assert len(rows) == 3
except Exception:
pass finally:
spark.stop()
def test_na_replace_multiple_chained_operations(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A", "Status": "X"},
{"Name": "Bob", "Type": "B", "Status": "Y"},
]
)
result = df.na.replace({"A": "TypeA"}, subset=["Type"]).na.replace(
{"X": "StatusX"}, subset=["Status"]
)
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
assert alice_row["Status"] == "StatusX"
finally:
spark.stop()
def test_na_replace_with_mixed_types_in_column(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": "1"},
{"Name": "Bob", "Value": "2"},
{"Name": "Charlie", "Value": "3"},
]
)
result = df.na.replace("1", "10", subset=["Value"])
rows = result.collect()
assert len(rows) == 3
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == "10"
finally:
spark.stop()
def test_na_replace_large_dataframe(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
data = [{"Name": f"Person{i}", "Value": i % 3} for i in range(100)]
df = spark.createDataFrame(data)
result = df.na.replace({0: 100, 1: 200, 2: 300}, subset=["Value"])
rows = result.collect()
assert len(rows) == 100
person0_row = next((r for r in rows if r["Name"] == "Person0"), None)
assert person0_row is not None
assert person0_row["Value"] == 100
person1_row = next((r for r in rows if r["Name"] == "Person1"), None)
assert person1_row is not None
assert person1_row["Value"] == 200
finally:
spark.stop()
def test_na_replace_preserves_other_columns(self):
spark = SparkSession.builder.appName("issue-287").getOrCreate()
try:
df = spark.createDataFrame(
[
{"Name": "Alice", "Type": "A", "Age": 25, "City": "NYC"},
{"Name": "Bob", "Type": "B", "Age": 30, "City": "LA"},
]
)
result = df.na.replace({"A": "TypeA"}, subset=["Type"])
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Type"] == "TypeA"
assert alice_row["Age"] == 25
assert alice_row["City"] == "NYC"
bob_row = next((r for r in rows if r["Name"] == "Bob"), None)
assert bob_row is not None
assert bob_row["Type"] == "B"
assert bob_row["Age"] == 30
assert bob_row["City"] == "LA"
finally:
spark.stop()
def test_na_replace_subset_column_name(self, spark):
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": 1},
{"Name": "Bob", "Value": 2},
]
)
result = df.na.replace(1, 99, subset=["Value"])
rows = result.collect()
assert len(rows) == 2
alice_row = next((r for r in rows if r["Name"] == "Alice"), None)
assert alice_row is not None
assert alice_row["Value"] == 99