1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
Test for issue #149: to_timestamp() string type detection. Uses get_imports from fixture only.
"""
from sparkless.testing import get_imports
_imports = get_imports()
SparkSession = _imports.SparkSession
F = _imports.F
col = F.col
to_timestamp = F.to_timestamp
regexp_replace = F.regexp_replace
class TestIssue149ToTimestampString:
"""Test cases for issue #149: to_timestamp() string type detection."""
def test_to_timestamp_with_regexp_replace_cast_string(self):
"""Test that to_timestamp() correctly detects string type from regexp_replace().cast("string").
This test verifies the fix for issue #149 where to_timestamp() would fail
with "expected output type 'Datetime('μs')', got 'String'" when the input
comes from regexp_replace().cast("string").
"""
spark = SparkSession.builder.appName("test_issue_149").getOrCreate()
# Create test data with datetime strings containing microseconds
data = [("2024-01-15T10:30:45.123456",)]
df = spark.createDataFrame(data, ["date_string"])
# This pattern was causing the issue: regexp_replace().cast("string")
df_transformed = df.withColumn(
"date_parsed",
to_timestamp(
regexp_replace(col("date_string"), r"\.\d+", "").cast("string"),
"yyyy-MM-dd'T'HH:mm:ss",
),
)
# Verify no schema error occurs
result = df_transformed.collect()
assert len(result) == 1
# Verify the column type is TimestampType, not StringType
date_parsed_field = next(
f for f in df_transformed.schema.fields if f.name == "date_parsed"
)
assert date_parsed_field.dataType.__class__.__name__ == "TimestampType", (
f"Expected TimestampType, got {date_parsed_field.dataType}"
)
# Verify the operation completes without schema validation errors
# The actual parsing result may be None if the format doesn't match,
# but the important thing is that the schema is correct
assert date_parsed_field.nullable is True
def test_to_timestamp_with_nested_cast_string(self):
"""Test that to_timestamp() correctly detects string type from nested cast operations."""
spark = SparkSession.builder.appName("test_issue_149_nested").getOrCreate()
data = [("2024-01-15T10:30:45",)]
df = spark.createDataFrame(data, ["date_string"])
# Test nested cast: cast to string directly
df_transformed = df.withColumn(
"date_parsed",
to_timestamp(col("date_string").cast("string"), "yyyy-MM-dd'T'HH:mm:ss"),
)
# Verify no schema error occurs
result = df_transformed.collect()
assert len(result) == 1
# Verify the column type is TimestampType
date_parsed_field = next(
f for f in df_transformed.schema.fields if f.name == "date_parsed"
)
assert date_parsed_field.dataType.__class__.__name__ == "TimestampType", (
f"Expected TimestampType, got {date_parsed_field.dataType}"
)
def test_to_timestamp_with_string_operations(self):
"""Test that to_timestamp() correctly detects string type from string operations."""
spark = SparkSession.builder.appName("test_issue_149_string_ops").getOrCreate()
data = [("2024-01-15T10:30:45",)]
df = spark.createDataFrame(data, ["date_string"])
# Test with regexp_replace (string operation) without cast
df_transformed = df.withColumn(
"date_parsed",
to_timestamp(
regexp_replace(col("date_string"), r"T", " "),
"yyyy-MM-dd HH:mm:ss",
),
)
# Verify no schema error occurs
result = df_transformed.collect()
assert len(result) == 1
# Verify the column type is TimestampType
date_parsed_field = next(
f for f in df_transformed.schema.fields if f.name == "date_parsed"
)
assert date_parsed_field.dataType.__class__.__name__ == "TimestampType", (
f"Expected TimestampType, got {date_parsed_field.dataType}"
)