robin-sparkless 4.3.0

PySpark-like DataFrame API in Rust on Polars; no JVM.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
"""
Global pytest configuration for unified Python tests.

Merge of tests/python and tests/upstream_sparkless conftests.
Backend selection: pytest marker, then MOCK_SPARK_TEST_BACKEND / SPARKLESS_TEST_BACKEND.
"""

from __future__ import annotations

import contextlib
import gc
import os
import re
import sys
import uuid

import pytest

# Prevent numpy crashes on macOS ARM chips with Python 3.9
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"

# Ensure PySpark workers use the same Python as the driver (avoids PYTHON_VERSION_MISMATCH)
if "PYSPARK_PYTHON" not in os.environ:
    os.environ["PYSPARK_PYTHON"] = sys.executable
if "PYSPARK_DRIVER_PYTHON" not in os.environ:
    os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable


def _is_pyspark_mode() -> bool:
    """True when tests should run with real PySpark (no sparkless)."""
    return (
        os.getenv("MOCK_SPARK_TEST_BACKEND")
        or os.getenv("SPARKLESS_TEST_BACKEND")
        or ""
    ).strip().lower() == "pyspark"


# In non-PySpark mode, configure sparkless for multiprocessing (pytest-xdist).
if not _is_pyspark_mode():
    try:
        import sparkless as _rs  # type: ignore[import-not-found]
    except ImportError:
        _rs = None  # type: ignore[assignment]
    if (
        _rs is not None
        and getattr(_rs, "_configure_for_multiprocessing", None) is not None
    ):
        _rs._configure_for_multiprocessing()

# Set JAVA_HOME for PySpark if not already set - must be done before any PySpark imports
if "JAVA_HOME" not in os.environ:
    java_home_candidates = [
        "/opt/homebrew/opt/openjdk@11",
        "/opt/homebrew/opt/openjdk@17",
        "/opt/homebrew/opt/openjdk",
    ]
    for candidate in java_home_candidates:
        java_bin_path = os.path.join(candidate, "bin", "java")
        if os.path.exists(java_bin_path):
            try:
                actual_java_path = os.path.realpath(java_bin_path)
                actual_java_bin = os.path.dirname(actual_java_path)
                actual_java_home = os.path.dirname(actual_java_bin)
                if os.path.exists(actual_java_home) and os.path.exists(
                    os.path.join(actual_java_home, "bin", "java")
                ):
                    os.environ["JAVA_HOME"] = actual_java_home
                    java_bin = os.path.join(actual_java_home, "bin")
                    if java_bin not in os.environ.get("PATH", ""):
                        os.environ["PATH"] = f"{java_bin}:{os.environ.get('PATH', '')}"
                    break
            except Exception:
                os.environ["JAVA_HOME"] = candidate
                java_bin = os.path.join(candidate, "bin")
                if java_bin not in os.environ.get("PATH", ""):
                    os.environ["PATH"] = f"{java_bin}:{os.environ.get('PATH', '')}"
                break


@pytest.fixture(scope="function", autouse=True)
def cleanup_after_each_test():
    """Automatically clean up resources after each test."""
    yield
    gc.collect()


def _ensure_robin_backend_type(session):
    """Set backend_type='robin' on session when SPARKLESS_TEST_BACKEND=robin."""
    if (os.environ.get("SPARKLESS_TEST_BACKEND") or "").strip().lower() == "robin":
        try:
            setattr(session, "backend_type", "robin")
        except AttributeError:
            pass


@pytest.fixture
def mock_spark_session():
    """Create a SparkSession with automatic cleanup."""
    from tests.fixtures.spark_backend import BackendType
    from tests.fixtures.spark_imports import get_spark_imports

    SparkSession = get_spark_imports(BackendType.MOCK).SparkSession
    session = SparkSession("test_app")
    _ensure_robin_backend_type(session)
    if (
        os.environ.get("SPARKLESS_TEST_BACKEND") or ""
    ).strip().lower() == "robin" and getattr(session, "backend_type", None) != "robin":
        raise RuntimeError(
            f"Robin mode was requested but mock_spark_session has backend_type={getattr(session, 'backend_type', None)!r}. "
            "SPARKLESS_BACKEND should be set by conftest."
        )
    yield session
    with contextlib.suppress(BaseException):
        session.stop()
    gc.collect()


class _SharedSessionWrapper:
    """Wraps a shared SparkSession so stop() is a no-op (prevents one test from killing the session for others)."""

    __slots__ = ("_session",)

    def __init__(self, session):
        self._session = session

    def stop(self):
        pass  # no-op so shared session is not stopped by individual tests

    def __getattr__(self, name):
        return getattr(self._session, name)


def _use_shared_session() -> bool:
    """Use a single session per worker/run for Robin backend.

    Default is per-test sessions so sequential and parallel (-n N) runs have the same
    pass/fail set and no cross-test catalog pollution. Set SPARKLESS_SHARED_SESSION=1
    to use one session per run (faster but requires unique table names via table_prefix).
    """
    if os.environ.get("SPARKLESS_SHARED_SESSION", "0").strip().lower() in (
        "1",
        "true",
        "yes",
    ):
        if os.environ.get("PYTEST_XDIST_WORKER"):
            return False
        return not _is_pyspark_mode()
    return False


@pytest.fixture(scope="session")
def _shared_robin_session():
    """One SparkSession for the whole test run (Robin backend). Speeds tests; table names must be unique per test via table_prefix."""
    from tests.fixtures.spark_backend import (
        SparkBackend,
        BackendType,
        get_backend_from_env,
    )

    if not _use_shared_session():
        pytest.skip("shared session disabled")
    env_backend = get_backend_from_env()
    if env_backend not in (None, BackendType.ROBIN):
        pytest.skip("shared session only for Robin backend")
    session = SparkBackend.create_mock_spark_session(
        "shared_robin_test", backend_type="robin"
    )
    _ensure_robin_backend_type(session)
    yield session
    with contextlib.suppress(BaseException):
        session.stop()
    gc.collect()


@pytest.fixture(scope="session")
def _shared_pyspark_session():
    """One PySpark SparkSession per worker (session scope). Speeds PySpark tests; use table_prefix for any tables/views."""
    from tests.fixtures.spark_backend import (
        SparkBackend,
        BackendType,
        get_backend_from_env,
    )

    if not _use_shared_session():
        pytest.skip("shared session disabled")
    env_backend = get_backend_from_env()
    if env_backend != BackendType.PYSPARK:
        pytest.skip("shared PySpark session only for PySpark backend")
    session = None
    try:
        session = SparkBackend.create_pyspark_session(
            "shared_pyspark_worker",
            enable_delta=False,
        )
        yield session
    except ImportError as e:
        if "pyspark" in str(e).lower() or "PySpark is not available" in str(e):
            pytest.skip(f"PySpark not installed: {e}")
        raise
    finally:
        if session is not None:
            with contextlib.suppress(BaseException):
                session.stop()
        gc.collect()


@pytest.fixture
def table_prefix(request: pytest.FixtureRequest) -> str:
    """Unique prefix for table/view names when using the shared session (Robin or PySpark). Use e.g. saveAsTable(f'{table_prefix}_mytable')."""
    name = getattr(request.node, "name", "test")[:40]
    safe = re.sub(r"[^a-zA-Z0-9_]", "_", name)
    return f"t_{safe}_{uuid.uuid4().hex[:6]}"


@pytest.fixture
def isolated_session():
    """Create an isolated SparkSession for tests requiring isolation."""
    import uuid

    from tests.fixtures.spark_backend import BackendType
    from tests.fixtures.spark_imports import get_spark_imports

    SparkSession = get_spark_imports(BackendType.MOCK).SparkSession
    session_name = f"test_isolated_{uuid.uuid4().hex[:8]}"
    session = SparkSession(session_name)
    _ensure_robin_backend_type(session)
    if (
        os.environ.get("SPARKLESS_TEST_BACKEND") or ""
    ).strip().lower() == "robin" and getattr(session, "backend_type", None) != "robin":
        raise RuntimeError(
            f"Robin mode was requested but isolated_session has backend_type={getattr(session, 'backend_type', None)!r}."
        )
    yield session
    with contextlib.suppress(BaseException):
        session.stop()
    gc.collect()


@pytest.fixture
def spark(request):
    """Unified SparkSession fixture. Uses tests.utils.get_spark when only env-based backend is needed."""
    # When running from tests/python-style (no fixtures.spark_backend), use tests.utils.get_spark
    try:
        from tests.fixtures.spark_backend import (
            SparkBackend,
            BackendType,
            get_backend_type,
        )
    except ImportError:
        # Fallback: no fixtures, create session from env (pyspark vs sparkless)
        _backend = (
            (
                os.getenv("MOCK_SPARK_TEST_BACKEND")
                or os.getenv("SPARKLESS_TEST_BACKEND")
                or ""
            )
            .strip()
            .lower()
        )
        if _backend == "pyspark":
            from pyspark.sql import SparkSession as _PySparkSession  # type: ignore[import-not-found]

            session = (
                _PySparkSession.builder.appName("test")
                .config("spark.driver.bindAddress", "127.0.0.1")
                .getOrCreate()
            )
        else:
            from sparkless.sql import SparkSession as _SparklessSession  # type: ignore[import-not-found]

            _builder = getattr(
                _SparklessSession.builder,
                "__call__",
                lambda: _SparklessSession.builder,
            )()
            session = _builder.app_name("test").get_or_create()
        yield session
        with contextlib.suppress(BaseException):
            session.stop()
        gc.collect()
        return

    try:
        backend = get_backend_type(request)
    except (AttributeError, TypeError):
        from tests.fixtures.spark_backend import BackendType

        backend = BackendType.ROBIN

    if backend == BackendType.BOTH:
        backend = BackendType.MOCK

    # Use one shared session per worker/run to speed test runs (table names must be unique via table_prefix)
    if backend == BackendType.ROBIN and _use_shared_session():
        session = request.getfixturevalue("_shared_robin_session")
        if getattr(session, "backend_type", None) != "robin":
            raise RuntimeError(
                "Robin mode was requested but shared session has wrong backend_type."
            )
        yield _SharedSessionWrapper(session)
        return
    if backend == BackendType.PYSPARK and _use_shared_session():
        session = request.getfixturevalue("_shared_pyspark_session")
        yield _SharedSessionWrapper(session)
        return

    test_name = "test_app"
    if hasattr(request, "node") and hasattr(request.node, "name"):
        test_name = f"test_{request.node.name[:50]}"

    try:
        kwargs = {}
        if backend == BackendType.PYSPARK:
            kwargs["enable_delta"] = False
        session = SparkBackend.create_session(
            app_name=test_name,
            backend=backend,
            request=request if hasattr(request, "node") else None,
            **kwargs,
        )
        if backend == BackendType.ROBIN:
            try:
                setattr(session, "backend_type", "robin")
            except AttributeError:
                pass
        if backend == BackendType.ROBIN:
            actual = getattr(session, "backend_type", None)
            if actual != "robin":
                raise RuntimeError(
                    f"Robin mode was requested but session has backend_type={actual!r}. "
                    "Tests must not silently run in polars/mock when SPARKLESS_TEST_BACKEND=robin."
                )
    except ValueError:
        raise
    except (ImportError, RuntimeError) as e:
        error_msg = str(e)
        if (
            "Could not serialize" in error_msg
            or "pickle" in error_msg.lower()
            or "Java gateway" in error_msg
            or "Failed to create PySpark session" in error_msg
            or "PySpark is not available" in error_msg
            or "No module named 'pyspark'" in error_msg
        ):
            pytest.skip(f"PySpark session creation failed: {e}")
        raise

    yield session

    with contextlib.suppress(BaseException):
        session.stop()
    gc.collect()


@pytest.fixture
def spark_backend(request):
    """Get the current backend type being used."""
    from tests.fixtures.spark_backend import get_backend_type

    try:
        return get_backend_type(request)
    except (AttributeError, TypeError):
        from tests.fixtures.spark_backend import BackendType

        return BackendType.MOCK


@pytest.fixture
def pyspark_session(request):
    """Create a PySpark SparkSession for comparison testing."""
    from tests.fixtures.spark_backend import SparkBackend

    try:
        session = SparkBackend.create_pyspark_session("test_app", enable_delta=False)
        yield session
        with contextlib.suppress(BaseException):
            session.stop()
        gc.collect()
    except (ImportError, RuntimeError) as e:
        pytest.skip(f"PySpark not available: {e}")


@pytest.fixture
def mock_spark():
    """Provide mock spark session for compatibility tests."""
    from tests.fixtures.spark_backend import BackendType
    from tests.fixtures.spark_imports import get_spark_imports

    SparkSession = get_spark_imports(BackendType.MOCK).SparkSession
    session = SparkSession("test_app")
    _ensure_robin_backend_type(session)
    if (
        os.environ.get("SPARKLESS_TEST_BACKEND") or ""
    ).strip().lower() == "robin" and getattr(session, "backend_type", None) != "robin":
        raise RuntimeError(
            f"Robin mode was requested but mock_spark has backend_type={getattr(session, 'backend_type', None)!r}."
        )
    yield session
    with contextlib.suppress(BaseException):
        session.stop()
    gc.collect()


@pytest.fixture
def temp_file_storage_path():
    """Provide a temporary directory for file storage backend tests."""
    import tempfile

    with tempfile.TemporaryDirectory() as tmp_dir:
        storage_path = os.path.join(tmp_dir, "test_storage")
        yield storage_path


def pytest_configure(config):
    """Configure pytest with custom markers."""
    _test_backend = (os.environ.get("SPARKLESS_TEST_BACKEND") or "").strip().lower()
    if _test_backend in ("", "robin"):
        os.environ["SPARKLESS_BACKEND"] = "robin"

    config.addinivalue_line(
        "markers", "delta: mark test as requiring Delta Lake (may be skipped)"
    )
    config.addinivalue_line(
        "markers", "performance: mark test as a performance benchmark"
    )
    config.addinivalue_line(
        "markers",
        "compatibility: mark test as compatibility test using expected outputs",
    )
    config.addinivalue_line(
        "markers", "unit: mark test as unit test (no external dependencies)"
    )
    config.addinivalue_line(
        "markers", "timeout: mark tests that rely on pytest-timeout"
    )
    config.addinivalue_line(
        "markers",
        "backend(mock|pyspark|both|robin): mark test to run with specific backend(s)",
    )
    config.addinivalue_line(
        "markers",
        "integration: mark test as integration test (may require external setup)",
    )