import os
import pytest
from tests.fixtures.spark_imports import get_spark_imports
_imports = get_spark_imports()
F = _imports.F
import datetime
def test_cast_datetime_string_to_date(spark) -> None:
df = spark.createDataFrame(
[{"date_str": "2025-01-01 10:30:00"}],
schema=["date_str"],
)
result = df.withColumn("d", F.col("date_str").cast("date"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["date_str"] == "2025-01-01 10:30:00"
assert rows[0]["d"] == datetime.date(2025, 1, 1)
def test_cast_date_only_string_to_date(spark) -> None:
df = spark.createDataFrame(
[{"date_str": "2025-01-01"}],
schema=["date_str"],
)
result = df.withColumn("d", F.col("date_str").cast("date"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["d"] == datetime.date(2025, 1, 1)
@pytest.mark.skipif(
(
os.environ.get("SPARKLESS_TEST_BACKEND")
or os.environ.get("MOCK_SPARK_TEST_BACKEND")
or ""
)
.strip()
.lower()
== "pyspark",
reason="Skipped in PySpark mode (driver/worker Python version mismatch with pytest-xdist)",
)
def test_try_cast_datetime_string_to_date_invalid_null(spark) -> None:
df = spark.createDataFrame(
[{"s": "2025-01-01 10:30:00"}, {"s": "not-a-date"}],
schema=["s"],
)
if getattr(F, "try_cast", None) is not None:
result = df.select(F.try_cast(F.col("s"), "date").alias("d"))
else:
result = df.select(F.expr("try_cast(s as date)").alias("d"))
rows = result.collect()
assert len(rows) == 2
assert rows[0]["d"] == datetime.date(2025, 1, 1)
assert rows[1]["d"] is None