import datetime
from tests.fixtures.spark_imports import get_spark_imports
def test_date_less_than_datetime(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"Name": "Alice",
"DateTime": datetime.datetime(2023, 1, 1, 12, 0, 0),
"Date": datetime.date(2024, 1, 1),
},
{
"Name": "Bob",
"DateTime": datetime.datetime(2024, 1, 1, 12, 0, 0),
"Date": datetime.date(2024, 1, 1),
},
]
)
result = df.filter(F_backend.col("Date") < F_backend.col("DateTime"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["Name"] == "Bob"
assert rows[0]["Date"] == datetime.date(2024, 1, 1)
assert rows[0]["DateTime"] == datetime.datetime(2024, 1, 1, 12, 0, 0)
def test_datetime_greater_than_date(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"dt": datetime.datetime(2024, 6, 15, 10, 0, 0),
"d": datetime.date(2024, 1, 1),
},
]
)
result = df.filter(F_backend.col("dt") > F_backend.col("d"))
rows = result.collect()
assert len(rows) == 1
def test_date_eq_datetime(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2024, 1, 1, 0, 0, 0),
},
{
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2024, 1, 1, 12, 0, 0),
},
]
)
result = df.filter(F_backend.col("d") == F_backend.col("dt"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["dt"] == datetime.datetime(2024, 1, 1, 0, 0, 0)
def test_date_lte_datetime(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2024, 1, 1, 0, 0, 0),
},
{
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2023, 12, 31, 23, 59, 0),
},
]
)
result = df.filter(F_backend.col("d") <= F_backend.col("dt"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["dt"] == datetime.datetime(2024, 1, 1, 0, 0, 0)
def test_date_gte_datetime(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"d": datetime.date(2024, 6, 15),
"dt": datetime.datetime(2024, 1, 1, 12, 0, 0),
},
{
"d": datetime.date(2023, 12, 1),
"dt": datetime.datetime(2024, 1, 1, 12, 0, 0),
},
]
)
result = df.filter(F_backend.col("d") >= F_backend.col("dt"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["d"] == datetime.date(2024, 6, 15)
def test_datetime_less_than_date(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"dt": datetime.datetime(2023, 6, 15, 10, 0, 0),
"d": datetime.date(2024, 1, 1),
},
{
"dt": datetime.datetime(2024, 6, 15, 10, 0, 0),
"d": datetime.date(2024, 1, 1),
},
]
)
result = df.filter(F_backend.col("dt") < F_backend.col("d"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["dt"] == datetime.datetime(2023, 6, 15, 10, 0, 0)
def test_date_ne_datetime(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2024, 1, 1, 12, 0, 0),
},
{
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2024, 1, 1, 0, 0, 0),
},
]
)
result = df.filter(F_backend.col("d") != F_backend.col("dt"))
rows = result.collect()
assert len(rows) == 1
assert rows[0]["dt"] == datetime.datetime(2024, 1, 1, 12, 0, 0)
def test_date_datetime_chained_filter(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"id": 1,
"d": datetime.date(2024, 1, 15),
"dt": datetime.datetime(2024, 1, 20, 10, 0, 0),
},
{
"id": 2,
"d": datetime.date(2024, 1, 15),
"dt": datetime.datetime(2024, 1, 10, 10, 0, 0),
},
{
"id": 3,
"d": datetime.date(2024, 1, 5),
"dt": datetime.datetime(2024, 1, 20, 10, 0, 0),
},
]
)
result = df.filter(F_backend.col("d") < F_backend.col("dt")).filter(
F_backend.col("id") >= 2
)
rows = result.collect()
assert len(rows) == 1
assert rows[0]["id"] == 3
def test_date_datetime_with_and(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"name": "A",
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2024, 1, 1, 12, 0, 0),
},
{
"name": "B",
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2023, 1, 1, 12, 0, 0),
},
]
)
result = df.filter(
(F_backend.col("d") < F_backend.col("dt")) & (F_backend.col("name") == "A")
)
rows = result.collect()
assert len(rows) == 1
assert rows[0]["name"] == "A"
def test_date_datetime_orderby(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"id": 1,
"d": datetime.date(2024, 3, 1),
"dt": datetime.datetime(2024, 3, 15, 0, 0, 0),
},
{
"id": 2,
"d": datetime.date(2024, 1, 1),
"dt": datetime.datetime(2024, 1, 15, 0, 0, 0),
},
{
"id": 3,
"d": datetime.date(2024, 2, 1),
"dt": datetime.datetime(2024, 2, 15, 0, 0, 0),
},
]
)
result = df.filter(F_backend.col("d") < F_backend.col("dt")).orderBy("id")
rows = result.collect()
assert len(rows) == 3
assert [r["id"] for r in rows] == [1, 2, 3]
def test_exact_scenario_from_issue_431(spark, spark_backend):
F_backend = get_spark_imports(spark_backend).F
df = spark.createDataFrame(
[
{
"Name": "Alice",
"DateTime": datetime.datetime(2023, 1, 1, 12, 0, 0),
"Date": datetime.date(2024, 1, 1),
},
{
"Name": "Bob",
"DateTime": datetime.datetime(2024, 1, 1, 12, 0, 0),
"Date": datetime.date(2024, 1, 1),
},
]
)
result = df.filter(F_backend.col("Date") < F_backend.col("DateTime"))
result.show() rows = result.collect()
assert len(rows) == 1
assert rows[0]["Name"] == "Bob"