class TestToTimestampCompatibility:
def test_to_timestamp_timestamp_type_pass_through(self, spark):
from datetime import datetime
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
data = [("2024-01-01T10:00:00", "test")]
df = spark.createDataFrame(data, ["timestamp_str", "name"])
df = df.withColumn(
"ts", F.to_timestamp(df["timestamp_str"], "yyyy-MM-dd'T'HH:mm:ss")
)
result = df.withColumn("ts2", F.to_timestamp(df["ts"], "yyyy-MM-dd'T'HH:mm:ss"))
rows = result.collect()
assert len(rows) == 1
assert isinstance(rows[0]["ts"], datetime)
assert isinstance(rows[0]["ts2"], datetime)
assert rows[0]["ts"] == rows[0]["ts2"]
def test_to_timestamp_string_type_with_format(self, spark):
from datetime import datetime
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
data = [("2024-01-01T10:00:00",)]
df = spark.createDataFrame(data, ["timestamp_str"])
result = df.withColumn(
"ts", F.to_timestamp(F.col("timestamp_str"), "yyyy-MM-dd'T'HH:mm:ss")
)
rows = result.collect()
assert len(rows) == 1
assert isinstance(rows[0]["ts"], datetime)
assert rows[0]["ts"] == datetime(2024, 1, 1, 10, 0, 0)
def test_to_timestamp_string_type_without_format(self, spark):
from datetime import datetime
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
data = [("2024-01-01 10:00:00",)]
df = spark.createDataFrame(data, ["timestamp_str"])
result = df.withColumn("ts", F.to_timestamp(F.col("timestamp_str")))
rows = result.collect()
assert len(rows) == 1
assert isinstance(rows[0]["ts"], datetime)
def test_to_timestamp_integer_type_unix_timestamp(self, spark):
from datetime import datetime
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
IntegerType = imports.IntegerType
StructType = imports.StructType
StructField = imports.StructField
unix_ts = 1704110400
schema = StructType([StructField("unix_ts", IntegerType(), True)])
df = spark.createDataFrame([{"unix_ts": unix_ts}], schema=schema)
result = df.withColumn("ts", F.to_timestamp(F.col("unix_ts")))
rows = result.collect()
assert len(rows) == 1
assert isinstance(rows[0]["ts"], datetime)
def test_to_timestamp_long_type_unix_timestamp(self, spark):
from datetime import datetime
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
LongType = imports.LongType
StructType = imports.StructType
StructField = imports.StructField
unix_ts = 1704110400
schema = StructType([StructField("unix_ts", LongType(), True)])
df = spark.createDataFrame([{"unix_ts": unix_ts}], schema=schema)
result = df.withColumn("ts", F.to_timestamp(F.col("unix_ts")))
rows = result.collect()
assert len(rows) == 1
assert isinstance(rows[0]["ts"], datetime)
def test_to_timestamp_date_type_conversion(self, spark):
from datetime import date, datetime
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
DateType = imports.DateType
StructType = imports.StructType
StructField = imports.StructField
schema = StructType([StructField("date_col", DateType(), True)])
df = spark.createDataFrame([{"date_col": date(2024, 1, 1)}], schema=schema)
result = df.withColumn("ts", F.to_timestamp(F.col("date_col")))
rows = result.collect()
assert len(rows) == 1
assert isinstance(rows[0]["ts"], datetime)
assert rows[0]["ts"].date() == date(2024, 1, 1)
def test_to_timestamp_double_type_unix_timestamp(self, spark):
from datetime import datetime
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
DoubleType = imports.DoubleType
StructType = imports.StructType
StructField = imports.StructField
unix_ts = 1704110400.5
schema = StructType([StructField("unix_ts", DoubleType(), True)])
df = spark.createDataFrame([{"unix_ts": unix_ts}], schema=schema)
result = df.withColumn("ts", F.to_timestamp(F.col("unix_ts")))
rows = result.collect()
assert len(rows) == 1
assert isinstance(rows[0]["ts"], datetime)
def test_to_timestamp_rejects_unsupported_type(self, spark):
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
BooleanType = imports.BooleanType
StructType = imports.StructType
StructField = imports.StructField
schema = StructType([StructField("bool_col", BooleanType(), True)])
df = spark.createDataFrame([{"bool_col": True}], schema=schema)
result = df.withColumn("ts", F.to_timestamp(F.col("bool_col")))
rows = result.collect()
assert len(rows) == 1
def test_to_timestamp_after_regexp_replace(self, spark):
from datetime import datetime, timedelta
from sparkless.testing import get_imports
imports = get_imports()
F = imports.F
StructType = imports.StructType
StructField = imports.StructField
StringType = imports.StringType
test_data = [
{
"id": f"record-{i:03d}",
"timestamp_str": (datetime.now() - timedelta(hours=i)).isoformat(),
}
for i in range(5)
]
schema = StructType(
[
StructField("id", StringType(), False),
StructField("timestamp_str", StringType(), False),
]
)
df = spark.createDataFrame(test_data, schema)
df_clean = df.withColumn(
"timestamp_clean",
F.regexp_replace(F.col("timestamp_str"), r"\.\d+", ""),
)
df_parsed = df_clean.withColumn(
"timestamp_parsed",
F.to_timestamp(F.col("timestamp_clean"), "yyyy-MM-dd'T'HH:mm:ss"),
)
schema_dict = {
field.name: type(field.dataType).__name__
for field in df_parsed.schema.fields
}
assert schema_dict["timestamp_parsed"] == "TimestampType"
rows = df_parsed.collect()
assert len(rows) == 5
for row in rows:
assert row["timestamp_parsed"] is None or isinstance(
row["timestamp_parsed"], datetime
)