import { Socket } from "node:net";
import { nextTick } from "ext:deno_node/_next_tick.ts";
import { codeMap, UV_ECANCELED } from "ext:deno_node/internal_binding/uv.ts";
import { setImmediate } from "node:timers";
const kCurrentWriteRequest = Symbol("kCurrentWriteRequest");
const kCurrentShutdownRequest = Symbol("kCurrentShutdownRequest");
const kPendingShutdownRequest = Symbol("kPendingShutdownRequest");
const kPendingClose = Symbol("kPendingClose");
const kJSStreamHandle = Symbol.for("kJSStreamHandle");
function isClosing() {
return this[kOwner].isClosing();
}
function onreadstart() {
return this[kOwner].readStart();
}
function onreadstop() {
return this[kOwner].readStop();
}
const kOwner = Symbol.for("kJSStreamOwner");
class JSStreamSocket extends Socket {
constructor(stream) {
const handle = {
[kJSStreamHandle]: true,
[kOwner]: null, close(cb) {
handle[kOwner].doClose(cb);
},
isClosing,
onreadstart,
onreadstop,
readStart() {
return handle[kOwner].readStart();
},
readStop() {
return handle[kOwner].readStop();
},
readBuffer: null,
emitEOF: null,
reading: false,
};
stream.pause();
stream.on("error", (err) => this.emit("error", err));
const ondata = (chunk) => {
if (
typeof chunk === "string" ||
stream.readableObjectMode === true
) {
stream.pause();
stream.removeListener("data", ondata);
this.emit("error", new Error("Stream is not in binary mode"));
return;
}
if (this._handle && this._handle.readBuffer) {
this._handle.readBuffer(chunk);
}
};
stream.on("data", ondata);
stream.once("end", () => {
if (this._handle && this._handle.emitEOF) {
this._handle.emitEOF();
}
});
stream.once("close", () => {
this.destroy();
});
super({ handle, manualStart: true });
handle[kOwner] = this;
this.stream = stream;
this[kCurrentWriteRequest] = null;
this[kCurrentShutdownRequest] = null;
this[kPendingShutdownRequest] = null;
this[kPendingClose] = false;
this.readable = stream.readable;
this.writable = stream.writable;
this.read(0);
}
isClosing() {
return !this.readable || !this.writable;
}
readStart() {
this.stream.resume();
return 0;
}
readStop() {
this.stream.pause();
return 0;
}
doShutdown(req) {
if (this[kCurrentWriteRequest] !== null) {
this[kPendingShutdownRequest] = req;
return 0;
}
this[kCurrentShutdownRequest] = req;
if (this[kPendingClose]) {
return 0;
}
const handle = this._handle;
nextTick(() => {
this.stream.end(() => {
this.finishShutdown(handle, 0);
});
});
return 0;
}
finishShutdown(_handle, _errCode) {
if (this[kCurrentShutdownRequest] === null) return;
this[kCurrentShutdownRequest] = null;
}
doWrite(req, bufs) {
if (this[kPendingClose]) {
this[kCurrentWriteRequest] = req;
return 0;
} else if (this._handle === null) {
return 0;
}
const handle = this._handle;
const self = this;
let pending = bufs.length;
this.stream.cork();
for (let i = 0; i < bufs.length; ++i) {
this.stream.write(bufs[i], done);
}
this.stream.uncork();
this[kCurrentWriteRequest] = req;
function done(err) {
if (!err && --pending !== 0) return;
pending = 0;
let errCode = 0;
if (err) {
errCode = codeMap.get(err.code) || codeMap.get("EPIPE");
}
setImmediate(() => {
self.finishWrite(handle, errCode);
});
}
return 0;
}
finishWrite(_handle, _errCode) {
if (this[kCurrentWriteRequest] === null) return;
this[kCurrentWriteRequest] = null;
if (this[kPendingShutdownRequest]) {
const req = this[kPendingShutdownRequest];
this[kPendingShutdownRequest] = null;
this.doShutdown(req);
}
}
doClose(cb) {
this[kPendingClose] = true;
const handle = this._handle;
this.stream.destroy();
setImmediate(() => {
this.finishWrite(handle, UV_ECANCELED);
this.finishShutdown(handle, UV_ECANCELED);
this[kPendingClose] = false;
if (cb) cb();
});
}
}
export { JSStreamSocket, kJSStreamHandle, kOwner };
export default JSStreamSocket;