const { op_net_listen, op_net_accept, op_net_close, op_net_read, op_net_write } = Deno.core.ops;
class Server extends EventEmitter {
#rid = null;
#address = null;
#connections = 0;
#closed = false;
#listening = false;
constructor(options, connectionListener) {
super();
if (typeof options === 'function') {
connectionListener = options;
options = {};
}
this.options = options || {};
if (connectionListener) {
this.on('connection', connectionListener);
}
}
listen(...args) {
const port = typeof args[0] === 'number' ? args[0] : 0;
const host = typeof args[1] === 'string' ? args[1] : '0.0.0.0';
const callback = args.find(arg => typeof arg === 'function');
try {
const addr = `${host}:${port}`;
this.#rid = op_net_listen(addr);
this.#listening = true;
this.#address = { port, address: host, family: 'IPv4' };
if (callback) this.once('listening', callback);
queueMicrotask(() => this.emit('listening'));
this.#acceptLoop();
} catch (err) {
queueMicrotask(() => this.emit('error', err));
}
return this;
}
async #acceptLoop() {
while (!this.#closed) {
try {
const connRid = await op_net_accept(this.#rid);
const connection = {
rid: connRid,
write: (data) => {
const buf = typeof data === 'string' ? new TextEncoder().encode(data) : data;
return op_net_write(connRid, buf);
},
end: () => op_net_close(connRid),
onData: null,
onEnd: null
};
this.emit('connection', connection);
this.#startReadLoop(connection);
} catch (e) {
if (this.#closed) break;
this.emit('error', e);
}
}
}
async #startReadLoop(conn) {
const buffer = new Uint8Array(65536);
try {
while (true) {
const nread = await op_net_read(conn.rid, buffer);
if (nread === 0) {
if (conn.onEnd) conn.onEnd();
break;
}
if (conn.onData) {
conn.onData(buffer.subarray(0, nread));
}
}
} catch (e) {
console.error("Read error on RID:", conn.rid, e.message);
}
}
address() {
if (!this.#rid) return null;
return Deno.core.ops.op_net_server_address(this.#rid);
}
getConnections(callback) {
if (typeof callback !== 'function') {
throw new TypeError('getConnections: argument must be a function');
}
queueMicrotask(() => {
callback(null, this.#connections.size);
});
}
close(callback) {
console.log("[Server] close() called");
if (callback) this.once('close', callback);
if (!this.#closed && this.#rid !== null) {
this.#closed = true;
this.#listening = false;
op_net_close(this.#rid);
this.#rid = null;
queueMicrotask(() => this.emit('close'));
}
return this;
}
}
class Socket extends EventEmitter {
#rid;
#writable = true;
#connecting = false;
constructor(rid) {
super();
this.#rid = rid;
this.#readLoop();
}
write(data, encoding = 'utf8', callback) {
if (!this.#writable) {
throw new Error('Socket is not writable');
}
const buffer = typeof data === 'string'
? new TextEncoder().encode(data)
: data;
(async () => {
try {
const nwritten = await op_net_write(this.#rid, buffer);
if (callback) callback(null, nwritten);
} catch (e) {
if (callback) callback(e);
this.emit('error', e);
}
})();
return true;
}
async #readLoop() {
const buf = new Uint8Array(65536);
try {
while (true) {
const nread = await Deno.core.ops.op_net_read(this.#rid, buf);
if (nread === 0) {
this.emit('end');
this.destroy();
break;
}
this.emit('data', buf.slice(0, nread));
}
} catch (e) {
this.emit('error', e);
}
}
connect(port, host = '127.0.0.1', connectionListener) {
if (this.#rid || this.#connecting) throw new Error('Socket is already handle or connecting');
this.#connecting = true;
if (connectionListener) this.on('connect', connectionListener);
const addr = `${host}:${port}`;
(async () => {
try {
this.#rid = await Deno.core.ops.op_net_connect(addr);
this.#connecting = false;
this.emit('connect');
this.#readLoop();
} catch (e) {
this.#connecting = false;
this.emit('error', e);
}
})();
return this;
}
destroy() {
this.#writable = false;
op_net_close(this.#rid);
this.emit('close');
}
}
globalThis.net = { Server, Socket };