from tests.tools.parity_base import ParityTestBase
from sparkless.testing import get_imports
class TestFormatStringParity(ParityTestBase):
def test_format_string_basic_parity(self, spark):
imports = get_imports()
F = imports.F
df = spark.createDataFrame(
[
{"Name": "Alice", "StringValue": "abc", "IntegerValue": 123},
{"Name": "Bob", "StringValue": "def", "IntegerValue": 456},
]
)
result = df.withColumn(
"NewValue",
F.format_string("%s-%s", F.col("StringValue"), F.col("IntegerValue")),
)
rows = result.collect()
assert len(rows) == 2
row_alice = [r for r in rows if r["Name"] == "Alice"][0]
row_bob = [r for r in rows if r["Name"] == "Bob"][0]
assert row_alice["NewValue"] == "abc-123"
assert row_bob["NewValue"] == "def-456"
def test_format_string_multiple_args_parity(self, spark):
imports = get_imports()
F = imports.F
df = spark.createDataFrame(
[
{"Name": "Alice", "Age": 25, "City": "NYC"},
{"Name": "Bob", "Age": 30, "City": "LA"},
]
)
result = df.withColumn(
"Info",
F.format_string(
"%s is %d years old and lives in %s",
F.col("Name"),
F.col("Age"),
F.col("City"),
),
)
rows = result.collect()
assert len(rows) == 2
row_alice = [r for r in rows if r["Name"] == "Alice"][0]
row_bob = [r for r in rows if r["Name"] == "Bob"][0]
assert row_alice["Info"] == "Alice is 25 years old and lives in NYC"
assert row_bob["Info"] == "Bob is 30 years old and lives in LA"
def test_format_string_with_null_parity(self, spark):
imports = get_imports()
F = imports.F
df = spark.createDataFrame(
[
{"Name": "Alice", "Value": "abc", "Number": 123},
{"Name": "Bob", "Value": None, "Number": 456},
]
)
result = df.withColumn(
"NewValue",
F.format_string("%s-%s", F.col("Value"), F.col("Number")),
)
rows = result.collect()
assert len(rows) == 2
row_alice = [r for r in rows if r["Name"] == "Alice"][0]
row_bob = [r for r in rows if r["Name"] == "Bob"][0]
assert row_alice["NewValue"] == "abc-123"
assert row_bob["NewValue"] == "null-456"