import json
import time
import subprocess
import tempfile
import os
from pathlib import Path
import pytest
from jupyter_client import KernelManager
@pytest.fixture(scope="session")
def kernel_binary():
repo_root = Path(__file__).parent.parent.parent
result = subprocess.run(
["cargo", "build", "--bin", "ggsql-jupyter"],
cwd=repo_root / "ggsql-jupyter",
capture_output=True,
text=True,
)
if result.returncode != 0:
pytest.fail(f"Failed to build kernel: {result.stderr}")
binary_path = repo_root / "target" / "debug" / "ggsql-jupyter"
if not binary_path.exists():
pytest.fail(f"Kernel binary not found at {binary_path}")
return str(binary_path)
@pytest.fixture
def kernel_manager(kernel_binary):
kernel_process = None
try:
km = KernelManager()
km.write_connection_file()
connection_file = km.connection_file
kernel_process = subprocess.Popen(
[kernel_binary, "-f", connection_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
km._kernel_process = kernel_process
time.sleep(3)
if kernel_process.poll() is not None:
stdout, stderr = kernel_process.communicate()
pytest.fail(
f"Kernel failed to start:\nstdout: {stdout.decode()}\nstderr: {stderr.decode()}"
)
yield km
finally:
if kernel_process is not None:
try:
kernel_process.terminate()
kernel_process.wait(timeout=5)
except:
try:
kernel_process.kill()
except:
pass
try:
os.unlink(km.connection_file)
except:
pass
@pytest.fixture
def client(kernel_manager):
kc = kernel_manager.client()
kc.start_channels()
kc.wait_for_ready(timeout=10)
yield kc
kc.stop_channels()
class TestKernelInfo:
def test_kernel_info_request(self, client):
msg_id = client.kernel_info()
reply = client.get_shell_msg(timeout=5)
assert reply["msg_type"] == "kernel_info_reply"
assert reply["parent_header"]["msg_id"] == msg_id
content = reply["content"]
assert content["status"] == "ok"
assert content["protocol_version"] == "5.3"
assert content["implementation"] == "ggsql-jupyter"
lang_info = content["language_info"]
assert lang_info["name"] == "ggsql"
assert lang_info["file_extension"] == ".ggsql"
assert lang_info["mimetype"] == "text/x-ggsql"
class TestExecution:
def test_simple_sql_execution(self, client):
code = "SELECT 1 as num, 'test' as text"
msg_id = client.execute(code, silent=False, store_history=True)
messages = []
for _ in range(10): try:
msg = client.get_iopub_msg(timeout=2)
messages.append(msg)
if (
msg["msg_type"] == "status"
and msg["content"]["execution_state"] == "idle"
):
break
except:
break
msg_types = [msg["msg_type"] for msg in messages]
assert "status" in msg_types assert "execute_input" in msg_types
reply = client.get_shell_msg(timeout=5)
assert reply["msg_type"] == "execute_reply"
assert reply["content"]["status"] == "ok"
assert reply["content"]["execution_count"] >= 1
def test_visualization_execution(self, client):
code = """
SELECT 1 as x, 2 as y
VISUALISE x, y
DRAW point
"""
msg_id = client.execute(code, silent=False, store_history=True)
execute_result = None
for _ in range(10):
try:
msg = client.get_iopub_msg(timeout=2)
if msg["msg_type"] == "execute_result":
execute_result = msg
if (
msg["msg_type"] == "status"
and msg["content"]["execution_state"] == "idle"
):
break
except:
break
assert execute_result is not None
content = execute_result["content"]
assert "data" in content
data = content["data"]
assert "application/vnd.vegalite.v6+json" in data
vega_spec = data["application/vnd.vegalite.v6+json"]
assert "$schema" in vega_spec
assert "data" in vega_spec
assert "mark" in vega_spec or "layer" in vega_spec
def test_error_handling(self, client):
code = "SELECT * FROM nonexistent_table"
msg_id = client.execute(code, silent=False, store_history=True)
error_msg = None
for _ in range(10):
try:
msg = client.get_iopub_msg(timeout=2)
if msg["msg_type"] == "error":
error_msg = msg
if (
msg["msg_type"] == "status"
and msg["content"]["execution_state"] == "idle"
):
break
except:
break
assert error_msg is not None
content = error_msg["content"]
assert "ename" in content
assert "evalue" in content
assert "traceback" in content
def test_persistent_state(self, client):
code1 = "CREATE TABLE test_table (id INTEGER, name VARCHAR)"
client.execute(code1, silent=False, store_history=True)
for _ in range(5):
try:
msg = client.get_iopub_msg(timeout=2)
if (
msg["msg_type"] == "status"
and msg["content"]["execution_state"] == "idle"
):
break
except:
break
try:
client.get_shell_msg(timeout=1)
except:
pass
code2 = "INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob')"
client.execute(code2, silent=False, store_history=True)
for _ in range(5):
try:
msg = client.get_iopub_msg(timeout=2)
if (
msg["msg_type"] == "status"
and msg["content"]["execution_state"] == "idle"
):
break
except:
break
try:
client.get_shell_msg(timeout=1)
except:
pass
code3 = "SELECT * FROM test_table"
client.execute(code3, silent=False, store_history=True)
reply = client.get_shell_msg(timeout=5)
assert reply["content"]["status"] == "ok"
class TestStatus:
def test_status_busy_idle(self, client):
code = "SELECT 1"
client.execute(code, silent=False, store_history=True)
statuses = []
for _ in range(10):
try:
msg = client.get_iopub_msg(timeout=2)
if msg["msg_type"] == "status":
statuses.append(msg["content"]["execution_state"])
if msg["content"]["execution_state"] == "idle":
break
except:
break
assert "busy" in statuses
assert "idle" in statuses
assert statuses.index("busy") < statuses.index("idle")
class TestShutdown:
def test_shutdown_request(self, kernel_manager):
kc = kernel_manager.client()
kc.start_channels()
try:
kc.wait_for_ready(timeout=10)
msg_id = kc.shutdown()
try:
reply = kc.get_shell_msg(timeout=10)
assert reply["msg_type"] == "shutdown_reply"
assert reply["content"]["status"] == "ok"
assert "restart" in reply["content"]
except:
time.sleep(2)
kernel_process = kernel_manager._kernel_process
if kernel_process.poll() is None:
kernel_process.terminate()
kernel_process.wait(timeout=5)
finally:
kc.stop_channels()
class TestExecuteInput:
def test_execute_input_echoed(self, client):
code = "SELECT 42 as answer"
msg_id = client.execute(code, silent=False, store_history=True)
execute_input = None
for _ in range(10):
try:
msg = client.get_iopub_msg(timeout=2)
if msg["msg_type"] == "execute_input":
execute_input = msg
break
if (
msg["msg_type"] == "status"
and msg["content"]["execution_state"] == "idle"
):
break
except:
break
assert execute_input is not None
content = execute_input["content"]
assert content["code"] == code
assert "execution_count" in content
class TestHeartbeat:
def test_heartbeat_responsive(self, kernel_manager):
kernel_process = kernel_manager._kernel_process
assert kernel_process.poll() is None, "Kernel process terminated unexpectedly"
time.sleep(1)
assert kernel_process.poll() is None, "Kernel process terminated after 1 second"
if __name__ == "__main__":
pytest.main([__file__, "-v"])