import { core, internals } from "ext:core/mod.js";
const {
op_jupyter_get_connection_info,
op_jupyter_repl_evaluate,
op_jupyter_repl_get_properties,
op_jupyter_repl_global_lexical_scope_names,
op_jupyter_repl_call_function_on_args,
op_jupyter_repl_call_function_on,
op_jupyter_repl_interrupt,
op_jupyter_repl_cancel_interrupt,
op_jupyter_recv_iopub,
op_jupyter_recv_input,
op_jupyter_send_input_reply,
op_jupyter_deno_version,
op_jupyter_typescript_version,
} = core.ops;
const ENC = new TextEncoder();
const DEC = new TextDecoder();
async function readExact(conn, n) {
const buf = new Uint8Array(n);
let offset = 0;
while (offset < n) {
const nread = await conn.read(buf.subarray(offset));
if (nread === null) throw new Error("connection closed");
offset += nread;
}
return buf;
}
async function readFrame(conn) {
const flagBuf = await readExact(conn, 1);
const flag = flagBuf[0];
const isCommand = (flag & 0x04) !== 0;
const isLong = (flag & 0x02) !== 0;
const hasMore = (flag & 0x01) !== 0;
let size;
if (isLong) {
const b = await readExact(conn, 8);
const dv = new DataView(b.buffer);
size = dv.getUint32(4);
} else {
const b = await readExact(conn, 1);
size = b[0];
}
const data = await readExact(conn, size);
return { data, hasMore, isCommand };
}
function makeShortFrame(data, more, isCommand = false) {
const flag = (more ? 0x01 : 0x00) |
(data.length > 255 ? 0x02 : 0x00) |
(isCommand ? 0x04 : 0x00);
if (data.length > 255) {
const buf = new Uint8Array(9 + data.length);
buf[0] = flag;
new DataView(buf.buffer).setUint32(5, data.length);
buf.set(data, 9);
return buf;
}
const buf = new Uint8Array(2 + data.length);
buf[0] = flag;
buf[1] = data.length;
buf.set(data, 2);
return buf;
}
async function writeAll(conn, buf) {
let offset = 0;
while (offset < buf.length) {
offset += await conn.write(buf.subarray(offset));
}
}
async function sendFrames(conn, frames) {
const encoded = frames.map((frame, i) =>
makeShortFrame(frame, i < frames.length - 1)
);
const total = encoded.reduce((acc, f) => acc + f.length, 0);
const buf = new Uint8Array(total);
let offset = 0;
for (const f of encoded) {
buf.set(f, offset);
offset += f.length;
}
await writeAll(conn, buf);
}
function makeGreeting() {
const buf = new Uint8Array(64);
buf[0] = 0xff;
buf[9] = 0x7f;
buf[10] = 0x03; buf[11] = 0x00; buf.set(ENC.encode("NULL"), 12); buf[32] = 0x00; return buf;
}
function makeReadyCommand(socketType) {
const sockBytes = ENC.encode(socketType);
const nameBytes = ENC.encode("READY");
const propName = ENC.encode("Socket-Type");
const propLen = 1 + propName.length + 4 + sockBytes.length;
const body = new Uint8Array(1 + nameBytes.length + propLen);
let o = 0;
body[o++] = nameBytes.length;
body.set(nameBytes, o);
o += nameBytes.length;
body[o++] = propName.length;
body.set(propName, o);
o += propName.length;
new DataView(body.buffer).setUint32(o, sockBytes.length);
o += 4;
body.set(sockBytes, o);
return makeShortFrame(body, false, true); }
async function zmtpHandshake(conn, socketType) {
await writeAll(conn, makeGreeting());
await readExact(conn, 64);
await writeAll(conn, makeReadyCommand(socketType));
await readFrame(conn);
}
const DELIMITER = ENC.encode("<IDS|MSG>");
async function hmacSign(key, parts) {
if (!key || key.length === 0) return "";
const cryptoKey = await crypto.subtle.importKey(
"raw",
ENC.encode(key),
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"],
);
const combined = new Uint8Array(
parts.reduce((acc, p) => acc + p.length, 0),
);
let offset = 0;
for (const p of parts) {
combined.set(p, offset);
offset += p.length;
}
const sig = await crypto.subtle.sign("HMAC", cryptoKey, combined);
return Array.from(new Uint8Array(sig))
.map((b) => b.toString(16).padStart(2, "0"))
.join("");
}
function makeHeader(session, msgType) {
return JSON.stringify({
msg_id: crypto.randomUUID(),
session,
date: new Date().toISOString(),
username: "kernel",
msg_type: msgType,
version: "5.3",
});
}
async function encodeMsg(
session,
key,
identities,
msgType,
content,
parentHeader = {},
metadata = {},
buffers = [],
) {
const header = makeHeader(session, msgType);
const parentHeaderStr = JSON.stringify(parentHeader);
const metaStr = JSON.stringify(metadata);
const contentStr = JSON.stringify(content);
const parts = [header, parentHeaderStr, metaStr, contentStr].map((s) =>
ENC.encode(s)
);
const sig = await hmacSign(key, parts);
const frames = [
...identities,
DELIMITER,
ENC.encode(sig),
ENC.encode(header),
ENC.encode(parentHeaderStr),
ENC.encode(metaStr),
ENC.encode(contentStr),
...buffers.map((b) => b instanceof Uint8Array ? b : new Uint8Array(b)),
];
return frames;
}
function decodeMsg(frames) {
let delimIdx = -1;
for (let i = 0; i < frames.length; i++) {
if (frames[i].length === DELIMITER.length) {
let match = true;
for (let j = 0; j < DELIMITER.length; j++) {
if (frames[i][j] !== DELIMITER[j]) {
match = false;
break;
}
}
if (match) {
delimIdx = i;
break;
}
}
}
if (delimIdx === -1) throw new Error("no <IDS|MSG> delimiter");
const identities = frames.slice(0, delimIdx);
const sig = DEC.decode(frames[delimIdx + 1]);
const header = JSON.parse(DEC.decode(frames[delimIdx + 2]));
const parentHeader = JSON.parse(DEC.decode(frames[delimIdx + 3]));
const metadata = JSON.parse(DEC.decode(frames[delimIdx + 4]));
const content = JSON.parse(DEC.decode(frames[delimIdx + 5]));
const buffers = frames.slice(delimIdx + 6);
return { identities, sig, header, parentHeader, metadata, content, buffers };
}
async function recvMultipart(conn) {
const frames = [];
while (true) {
const { data, hasMore } = await readFrame(conn);
frames.push(data);
if (!hasMore) break;
}
return frames;
}
async function runHeartbeat(port, ip) {
const listener = Deno.listen({ hostname: ip, port });
while (true) {
const conn = await listener.accept();
(async () => {
try {
await zmtpHandshake(conn, "REP");
while (true) {
const frames = await recvMultipart(conn);
await sendFrames(conn, frames);
}
} catch {
} finally {
try {
conn.close();
} catch { }
}
})();
}
}
function makeQueue() {
const items = [];
const waiters = [];
return {
push(item) {
if (waiters.length > 0) {
waiters.shift()(item);
} else {
items.push(item);
}
},
async pop() {
if (items.length > 0) return items.shift();
return new Promise((resolve) => waiters.push(resolve));
},
};
}
class RouterSocket {
constructor(port, ip) {
this.port = port;
this.ip = ip;
this.incoming = makeQueue();
this.peers = new Map(); this._listen();
}
_listen() {
(async () => {
const listener = Deno.listen({ hostname: this.ip, port: this.port });
while (true) {
const conn = await listener.accept();
this._handlePeer(conn);
}
})();
}
_handlePeer(conn) {
const peerId = crypto.getRandomValues(new Uint8Array(5));
const peerKey = Array.from(peerId).join(",");
this.peers.set(peerKey, conn);
(async () => {
try {
await zmtpHandshake(conn, "ROUTER");
while (true) {
const frames = await recvMultipart(conn);
this.incoming.push({ peerId, peerKey, frames });
}
} catch {
} finally {
this.peers.delete(peerKey);
try {
conn.close();
} catch { }
}
})();
}
async recv() {
return await this.incoming.pop();
}
async send(peerId, frames) {
const peerKey = Array.from(peerId).join(",");
const conn = this.peers.get(peerKey);
if (!conn) return;
try {
await sendFrames(conn, frames);
} catch {
this.peers.delete(peerKey);
try {
conn.close();
} catch { }
}
}
async sendAll(frames) {
const dead = [];
for (const [peerKey, conn] of this.peers) {
try {
await sendFrames(conn, frames);
} catch {
dead.push(peerKey);
}
}
for (const peerKey of dead) this.peers.delete(peerKey);
}
}
class PubSocket {
constructor(port, ip) {
this.port = port;
this.ip = ip;
this.conns = new Set();
this._listen();
}
_listen() {
(async () => {
const listener = Deno.listen({ hostname: this.ip, port: this.port });
while (true) {
const conn = await listener.accept();
(async () => {
try {
await zmtpHandshake(conn, "PUB");
const subFrame = await recvMultipart(conn);
void subFrame;
this.conns.add(conn);
while (true) {
try {
await recvMultipart(conn);
} catch {
break;
}
}
} catch {
} finally {
this.conns.delete(conn);
try {
conn.close();
} catch { }
}
})();
}
})();
}
async send(frames) {
const dead = [];
for (const conn of this.conns) {
try {
await sendFrames(conn, frames);
} catch {
dead.push(conn);
}
}
for (const conn of dead) this.conns.delete(conn);
}
}
async function startJupyterKernel() {
const info = JSON.parse(op_jupyter_get_connection_info());
const { ip, key, hb_port, shell_port, control_port, stdin_port, iopub_port } =
info;
const session = crypto.randomUUID();
runHeartbeat(hb_port, ip);
const shell = new RouterSocket(shell_port, ip);
const control = new RouterSocket(control_port, ip);
const iopub = new PubSocket(iopub_port, ip);
const stdin = new RouterSocket(stdin_port, ip);
let executionCount = 0;
let currentParentHeader = {};
let currentAllowStdin = false;
let shuttingDown = false;
async function publishStatus(status, parentHeader) {
const frames = await encodeMsg(
session,
key,
[],
"status",
{ execution_state: status },
parentHeader,
);
await iopub.send(frames);
}
async function publishIopub(msg) {
let msgType = msg.msg_type;
if (msgType === "stream_stdout" || msgType === "stream_stderr") {
msgType = "stream";
}
const frames = await encodeMsg(
session,
key,
[],
msgType,
msg.content,
currentParentHeader,
msg.metadata || {},
msg.buffers || [],
);
await iopub.send(frames);
}
async function iopubDrainer() {
while (true) {
const msg = await op_jupyter_recv_iopub();
if (msg !== null && msg !== undefined) {
try {
await publishIopub(msg);
} catch (e) {
void e;
}
}
}
}
iopubDrainer();
function kernelInfo() {
return {
status: "ok",
protocol_version: "5.3",
implementation: "Deno kernel",
implementation_version: op_jupyter_deno_version(),
language_info: {
name: "typescript",
version: op_jupyter_typescript_version(),
mimetype: "text/x.typescript",
file_extension: ".ts",
pygments_lexer: "typescript",
codemirror_mode: { name: "typescript" },
nbconvert_exporter: "script",
},
banner: "Welcome to Deno kernel",
help_links: [{ text: "Visit Deno manual", url: "https://docs.deno.com" }],
debugger: false,
};
}
function checkIsComplete(code) {
let stack = [];
let i = 0;
while (i < code.length) {
const ch = code[i];
if (ch === "/" && code[i + 1] === "/") {
while (i < code.length && code[i] !== "\n") i++;
continue;
}
if (ch === "/" && code[i + 1] === "*") {
i += 2;
let closed = false;
while (i < code.length - 1) {
if (code[i] === "*" && code[i + 1] === "/") {
i += 2;
closed = true;
break;
}
i++;
}
if (!closed) return { status: "incomplete", indent: "" };
continue;
}
if (ch === "'" || ch === '"' || ch === "`") {
const q = ch;
i++;
let closed = false;
while (i < code.length) {
if (code[i] === "\\" && q !== "`") {
i += 2;
continue;
}
if (code[i] === q) {
i++;
closed = true;
break;
}
i++;
}
if (!closed) return { status: "incomplete", indent: "" };
continue;
}
if (ch === "(" || ch === "[" || ch === "{") stack.push(ch);
if (ch === ")") {
if (stack.pop() !== "(") return { status: "invalid" };
}
if (ch === "]") {
if (stack.pop() !== "[") return { status: "invalid" };
}
if (ch === "}") {
if (stack.pop() !== "{") return { status: "invalid" };
}
i++;
}
if (stack.length === 0) return { status: "complete" };
return { status: "incomplete", indent: " " };
}
async function handleExecute(peerId, socket, msg) {
const { header: parentHeader, content } = msg;
currentParentHeader = parentHeader;
currentAllowStdin = content.allow_stdin === true;
const silent = content.silent || false;
const storeHistory = content.store_history !== false;
const code = content.code || "";
if (!silent && storeHistory) executionCount++;
await publishStatus("busy", parentHeader);
const inputFrames = await encodeMsg(
session,
key,
[],
"execute_input",
{ code, execution_count: executionCount },
parentHeader,
);
await iopub.send(inputFrames);
let evalResult = null;
try {
evalResult = await op_jupyter_repl_evaluate(code);
} catch (e) {
const errFrames = await encodeMsg(
session,
key,
[],
"error",
{
ename: e?.name || "Error",
evalue: e?.message || String(e),
traceback: [],
},
parentHeader,
);
await iopub.send(errFrames);
const replyContent = {
status: "error",
execution_count: executionCount,
ename: e?.name || "Error",
evalue: e?.message || String(e),
traceback: [],
};
const replyFrames = await encodeMsg(
session,
key,
[],
"execute_reply",
replyContent,
parentHeader,
);
await socket.send(peerId, replyFrames);
await publishStatus("idle", parentHeader);
return;
}
if (evalResult !== null && evalResult !== undefined) {
const exDetails = evalResult?.value?.exceptionDetails;
if (exDetails) {
const exception = exDetails.exception;
let ename = "Error";
let evalue = "(none)";
let traceback = [];
if (exception) {
const callResult = await op_jupyter_repl_call_function_on_args(
`function(object) {
if (object instanceof Error) {
const name = "name" in object ? String(object.name) : "";
const message = "message" in object ? String(object.message) : "";
const stack = "stack" in object ? String(object.stack) : "";
return JSON.stringify({ name, message, stack });
} else {
return JSON.stringify({ name: "", message: String(object), stack: "" });
}
}`,
[exception],
);
if (callResult?.result?.value) {
try {
const parsed = JSON.parse(callResult.result.value);
ename = parsed.name || "Error";
evalue = parsed.message || "(none)";
traceback = (parsed.stack || "").split("\n");
} catch { }
}
} else {
ename = exDetails.text || "Error";
evalue = exDetails.text || "(none)";
}
const errFrames = await encodeMsg(
session,
key,
[],
"error",
{ ename, evalue, traceback },
parentHeader,
);
await iopub.send(errFrames);
const replyFrames = await encodeMsg(
session,
key,
[],
"execute_reply",
{
status: "error",
execution_count: executionCount,
ename,
evalue,
traceback,
},
parentHeader,
);
await socket.send(peerId, replyFrames);
} else {
const result = evalResult?.value?.result;
if (result && !silent) {
const arg0 = { value: executionCount };
const arg1 = result.objectId
? { objectId: result.objectId }
: { value: result.value };
await op_jupyter_repl_call_function_on(arg0, arg1);
}
const replyFrames = await encodeMsg(
session,
key,
[],
"execute_reply",
{
status: "ok",
execution_count: executionCount,
payload: [],
user_expressions: {},
},
parentHeader,
);
await socket.send(peerId, replyFrames);
}
} else {
const replyFrames = await encodeMsg(
session,
key,
[],
"execute_reply",
{
status: "error",
execution_count: executionCount,
ename: "Error",
evalue: "Execution failed",
traceback: [],
},
parentHeader,
);
await socket.send(peerId, replyFrames);
}
await publishStatus("idle", parentHeader);
}
async function shellLoop(socket) {
while (!shuttingDown) {
const { peerId, frames } = await socket.recv();
try {
await handleShellMessage(socket, peerId, frames);
} catch (err) {
void err;
}
}
}
async function handleShellMessage(socket, peerId, frames) {
const msg = decodeMsg(frames);
const msgType = msg.header?.msg_type;
const parentHeader = msg.header;
if (msgType === "execute_request") {
await handleExecute(peerId, socket, msg);
return;
}
await publishStatus("busy", parentHeader);
try {
if (msgType === "kernel_info_request") {
const replyFrames = await encodeMsg(
session,
key,
[],
"kernel_info_reply",
kernelInfo(),
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "complete_request") {
const userCode = msg.content?.code || "";
const cursorPos = msg.content?.cursor_pos || userCode.length;
const expr = getExprFromLineAtPos(userCode, cursorPos);
let completions = [];
let cursorStart = cursorPos;
if (expr.includes(".")) {
const dotIdx = expr.lastIndexOf(".");
const subExpr = expr.slice(0, dotIdx);
const propName = expr.slice(dotIdx + 1);
const props = await getExprProperties(subExpr);
completions = props.filter((n) =>
!n.startsWith("Symbol(") && n.startsWith(propName)
);
cursorStart = cursorPos - propName.length;
} else {
const globalProps = await getExprProperties("globalThis");
const lexicalNames = await getLexicalScopeNames();
const allNames = [...new Set([...globalProps, ...lexicalNames])];
completions = allNames.filter((n) => n.startsWith(expr)).sort();
cursorStart = cursorPos - expr.length;
}
const replyFrames = await encodeMsg(
session,
key,
[],
"complete_reply",
{
status: "ok",
matches: completions,
cursor_start: cursorStart,
cursor_end: cursorPos,
metadata: {},
},
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "is_complete_request") {
const result = checkIsComplete(msg.content?.code || "");
const replyFrames = await encodeMsg(
session,
key,
[],
"is_complete_reply",
result,
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "inspect_request") {
const replyFrames = await encodeMsg(
session,
key,
[],
"inspect_reply",
{
status: "ok",
found: false,
data: {},
metadata: {},
},
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "history_request") {
const replyFrames = await encodeMsg(
session,
key,
[],
"history_reply",
{ status: "ok", history: [] },
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "comm_info_request") {
const replyFrames = await encodeMsg(
session,
key,
[],
"comm_info_reply",
{ status: "ok", comms: {} },
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "comm_open") {
const replyFrames = await encodeMsg(
session,
key,
[],
"comm_close",
{ comm_id: msg.content?.comm_id, data: {} },
parentHeader,
);
await socket.send(peerId, replyFrames);
}
} finally {
await publishStatus("idle", parentHeader);
}
}
async function controlLoop(socket) {
while (true) {
const { peerId, frames } = await socket.recv();
try {
await handleControlMessage(socket, peerId, frames);
} catch (err) {
void err;
}
}
}
async function handleControlMessage(socket, peerId, frames) {
const msg = decodeMsg(frames);
const msgType = msg.header?.msg_type;
const parentHeader = msg.header;
if (msgType === "kernel_info_request") {
const replyFrames = await encodeMsg(
session,
key,
[],
"kernel_info_reply",
kernelInfo(),
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "shutdown_request") {
const restart = msg.content?.restart || false;
const replyFrames = await encodeMsg(
session,
key,
[],
"shutdown_reply",
{ status: "ok", restart },
parentHeader,
);
await socket.send(peerId, replyFrames);
shuttingDown = true;
setTimeout(() => Deno.exit(0), 100);
} else if (msgType === "interrupt_request") {
op_jupyter_repl_interrupt();
const replyFrames = await encodeMsg(
session,
key,
[],
"interrupt_reply",
{ status: "ok" },
parentHeader,
);
await socket.send(peerId, replyFrames);
} else if (msgType === "debug_request") {
}
}
async function getExprProperties(expr) {
const evalResp = await op_jupyter_repl_get_properties(
await evaluateExprForObjectId(expr),
);
if (!evalResp?.result) return [];
return evalResp.result.map((p) => p.name);
}
async function evaluateExprForObjectId(expr) {
try {
const resp = await op_jupyter_repl_evaluate(
`(${expr})`, );
return resp?.value?.result?.objectId || null;
} catch {
return null;
}
}
async function getLexicalScopeNames() {
const resp = await op_jupyter_repl_global_lexical_scope_names();
return resp?.names || [];
}
function getExprFromLineAtPos(line, cursorPos) {
const sub = line.slice(0, cursorPos);
const start = sub.search(/[\w$._]+$/);
if (start === -1) return "";
return sub.slice(start);
}
async function stdinLoop() {
while (!shuttingDown) {
const req = await op_jupyter_recv_input();
if (req === null || req === undefined) break;
let value = null;
try {
value = await requestInput(req);
} catch (err) {
void err;
}
op_jupyter_send_input_reply(value);
}
}
async function requestInput(req) {
if (!currentAllowStdin || !currentParentHeader.msg_id) {
return null;
}
if (stdin.peers.size === 0) {
await new Promise((r) => setTimeout(r, 100));
}
if (stdin.peers.size === 0) {
return null;
}
const reqFrames = await encodeMsg(
session,
key,
[],
"input_request",
{ prompt: req.prompt, password: req.password },
currentParentHeader,
);
await stdin.sendAll(reqFrames);
while (true) {
const { frames } = await stdin.recv();
try {
const reply = decodeMsg(frames);
if (reply.header?.msg_type === "input_reply") {
const raw = reply.content?.value;
return typeof raw === "string" ? raw : null;
}
} catch {
return null;
}
}
}
await Promise.all([
shellLoop(shell),
controlLoop(control),
stdinLoop(),
]);
}
internals.startJupyterKernel = startJupyterKernel;