import os
import pytest
from tests.fixtures.spark_imports import get_spark_imports
_imports = get_spark_imports()
SparkSession = _imports.SparkSession
F = _imports.F
@pytest.fixture
def enable_cache():
os.environ["SPARKLESS_FEATURE_ENABLE_EXPRESSION_TRANSLATION_CACHE"] = "1"
yield
if "SPARKLESS_FEATURE_enable_expression_translation_cache" in os.environ:
del os.environ["SPARKLESS_FEATURE_ENABLE_EXPRESSION_TRANSLATION_CACHE"]
def test_nested_operations_with_drop(enable_cache):
spark = SparkSession.builder.appName("nested_ops").getOrCreate()
data = [
(f"imp_{i:03d}", f"2024-01-15T10:30:45.{i:06d}", f"campaign_{i}")
for i in range(200)
]
df = spark.createDataFrame(
data, ["impression_id", "impression_date", "campaign_id"]
)
df_result = (
df.withColumn(
"date_cleaned", F.regexp_replace(F.col("impression_date"), r"\.\d+", "")
)
.withColumn(
"date_parsed",
F.to_timestamp(
F.col("date_cleaned").cast("string"), "yyyy-MM-dd'T'HH:mm:ss"
),
)
.select(
"impression_id", "campaign_id", "date_parsed"
) .withColumn(
"hour", F.hour(F.col("date_parsed"))
) .filter(F.col("hour").isNotNull())
)
assert "impression_date" not in df_result.columns
assert "date_cleaned" not in df_result.columns
try:
count = df_result.count()
assert count == 200
rows = df_result.collect()
assert len(rows) == 200
except Exception as e:
error_msg = str(e).lower()
if ("impression_date" in error_msg or "date_cleaned" in error_msg) and (
"cannot resolve" in error_msg
or "not found" in error_msg
or "unable to find" in error_msg
):
pytest.fail(
f"BUG REPRODUCED! Nested operations with drop failed: {e}\n"
f"This suggests that lazy frame execution plan preserves column references."
)
raise
spark.stop()
def test_lazy_frame_reuse_after_select(enable_cache):
spark = SparkSession.builder.appName("lazy_reuse").getOrCreate()
data = [
(f"imp_{i:03d}", f"2024-01-15T10:30:45.{i:06d}", f"campaign_{i}")
for i in range(200)
]
df = spark.createDataFrame(
data, ["impression_id", "impression_date", "campaign_id"]
)
df_result = (
df.withColumn(
"date_cleaned", F.regexp_replace(F.col("impression_date"), r"\.\d+", "")
)
.select("impression_id", "campaign_id", "date_cleaned") .withColumn("id_copy", F.col("impression_id"))
.filter(F.col("campaign_id").isNotNull())
)
assert "impression_date" not in df_result.columns
try:
count = df_result.count()
assert count == 200
rows = df_result.collect()
assert len(rows) == 200
except Exception as e:
error_msg = str(e).lower()
if ("impression_date" in error_msg) and (
"cannot resolve" in error_msg
or "not found" in error_msg
or "unable to find" in error_msg
):
pytest.fail(
f"BUG REPRODUCED! Execution plan still references dropped column: {e}"
)
raise
spark.stop()