import { addAbortSignalNoValidate } from "__wasm_rquickjs_builtin/internal/streams/add-abort-signal";
import { Buffer } from "buffer";
import { getDefaultHighWaterMark, getHighWaterMark } from "__wasm_rquickjs_builtin/internal/streams/state";
import { isUint8Array } from "__wasm_rquickjs_builtin/internal/util/types";
import { Stream } from "__wasm_rquickjs_builtin/internal/streams/legacy";
import {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
ERR_STREAM_PREMATURE_CLOSE,
AbortError,
} from "__wasm_rquickjs_builtin/internal/errors";
import { isDestroyed, isWritable, isWritableEnded } from "__wasm_rquickjs_builtin/internal/streams/utils";
import { validateObject, validateBoolean } from "__wasm_rquickjs_builtin/internal/validators";
import eos from "__wasm_rquickjs_builtin/internal/streams/end-of-stream";
import destroyImpl from "__wasm_rquickjs_builtin/internal/streams/destroy";
import EventEmitter from "events";
import Readable from "__wasm_rquickjs_builtin/internal/streams/readable";
import { nextTick } from "node:process";
function _uint8ArrayToBuffer(chunk) {
return Buffer.from(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
}
const { errorOrDestroy } = destroyImpl;
function isDuplexStream(maybe_duplex) {
const isReadable = Readable.prototype.isPrototypeOf(maybe_duplex);
let prototype = maybe_duplex;
let isDuplex = false;
while (prototype?.constructor && prototype.constructor.name !== "Object") {
if (prototype.constructor.name === "Duplex") {
isDuplex = true;
break;
}
prototype = Object.getPrototypeOf(prototype);
}
return isReadable && isDuplex;
}
function nop() { }
const kOnFinished = Symbol("kOnFinished");
function WritableState(options, stream, isDuplex) {
if (typeof isDuplex !== "boolean") {
isDuplex = isDuplexStream(stream);
}
this.objectMode = !!(options && options.objectMode);
if (isDuplex) {
this.objectMode = this.objectMode ||
!!(options && options.writableObjectMode);
}
this.highWaterMark = options
? getHighWaterMark(this, options, "writableHighWaterMark", isDuplex)
: getDefaultHighWaterMark(false);
this.finalCalled = false;
this.needDrain = false;
this.ending = false;
this.ended = false;
this.finished = false;
this.destroyed = false;
const noDecode = !!(options && options.decodeStrings === false);
this.decodeStrings = !noDecode;
this.defaultEncoding = (options && options.defaultEncoding) || "utf8";
if (!Buffer.isEncoding(this.defaultEncoding)) {
throw new ERR_UNKNOWN_ENCODING(this.defaultEncoding);
}
this.length = 0;
this.writing = false;
this.corked = 0;
this.sync = true;
this.bufferProcessing = false;
this.onwrite = onwrite.bind(undefined, stream);
this.writecb = null;
this.writelen = 0;
this.afterWriteTickInfo = null;
resetBuffer(this);
this.pendingcb = 0;
this.constructed = true;
this.prefinished = false;
this.errorEmitted = false;
this.emitClose = !options || options.emitClose !== false;
this.autoDestroy = !options || options.autoDestroy !== false;
this.errored = null;
this.closed = false;
this.closeEmitted = false;
this[kOnFinished] = [];
}
function resetBuffer(state) {
state.buffered = [];
state.bufferedIndex = 0;
state.allBuffers = true;
state.allNoop = true;
}
WritableState.prototype.getBuffer = function getBuffer() {
return this.buffered.slice(this.bufferedIndex);
};
Object.defineProperty(WritableState.prototype, "bufferedRequestCount", {
get() {
return this.buffered.length - this.bufferedIndex;
},
});
function Writable(options) {
const isDuplex = isDuplexStream(this);
if (
!isDuplex && !Function.prototype[Symbol.hasInstance].call(Writable, this)
) {
return new Writable(options);
}
if (!this._events) {
this._events = Object.create(null);
this._events.close = undefined;
this._events.error = undefined;
this._events.prefinish = undefined;
this._events.finish = undefined;
this._events.drain = undefined;
this._eventsCount = 0;
}
this._writableState = new WritableState(options, this, isDuplex);
if (options) {
if (typeof options.write === "function") {
this._write = options.write;
}
if (typeof options.writev === "function") {
this._writev = options.writev;
}
if (typeof options.destroy === "function") {
this._destroy = options.destroy;
}
if (typeof options.final === "function") {
this._final = options.final;
}
if (typeof options.construct === "function") {
this._construct = options.construct;
}
}
Stream.call(this, options);
if (options && options.signal) {
addAbortSignalNoValidate(options.signal, this);
}
destroyImpl.construct(this, () => {
const state = this._writableState;
if (!state.writing) {
clearBuffer(this, state);
}
finishMaybe(this, state);
});
}
Object.setPrototypeOf(Writable.prototype, Stream.prototype);
Object.setPrototypeOf(Writable, Stream);
Object.defineProperty(Writable, Symbol.hasInstance, {
value: function (object) {
if (Function.prototype[Symbol.hasInstance].call(this, object)) return true;
if (this !== Writable) return false;
return object && object._writableState instanceof WritableState;
},
});
Writable.prototype.pipe = function () {
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};
function _write(stream, chunk, encoding, cb) {
const state = stream._writableState;
if (typeof encoding === "function") {
cb = encoding;
encoding = null;
}
if (typeof cb !== "function") {
cb = nop;
}
if (chunk === null) {
throw new ERR_STREAM_NULL_VALUES();
} else if (!state.objectMode) {
if (!encoding) {
encoding = state.defaultEncoding;
} else if (encoding !== "buffer" && !Buffer.isEncoding(encoding)) {
throw new ERR_UNKNOWN_ENCODING(encoding);
}
if (typeof chunk === "string") {
if (encoding === "buffer") {
const err = new TypeError('Second argument must be a buffer');
err.code = 'ERR_INVALID_ARG_TYPE';
throw err;
}
if (state.decodeStrings !== false) {
chunk = Buffer.from(chunk, encoding);
encoding = "buffer";
}
} else if (chunk instanceof Buffer) {
encoding = "buffer";
} else if (ArrayBuffer.isView(chunk)) {
chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength);
encoding = "buffer";
} else {
throw new ERR_INVALID_ARG_TYPE(
"chunk",
["string", "Buffer", "TypedArray", "DataView"],
chunk,
);
}
}
let err;
if (state.ending) {
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
err = new ERR_STREAM_DESTROYED("write");
}
if (err) {
nextTick(cb, err);
errorOrDestroy(stream, err, true);
return err;
}
state.pendingcb++;
return writeOrBuffer(stream, state, chunk, encoding, cb);
}
Writable.prototype.write = function (chunk, encoding, cb) {
return _write(this, chunk, encoding, cb) === true;
};
Writable.prototype.cork = function () {
this._writableState.corked++;
};
Writable.prototype.uncork = function () {
const state = this._writableState;
if (state.corked) {
state.corked--;
if (!state.writing) {
clearBuffer(this, state);
}
}
};
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
if (typeof encoding === "string") {
encoding = encoding.toLowerCase();
}
if (!Buffer.isEncoding(encoding)) {
throw new ERR_UNKNOWN_ENCODING(encoding);
}
this._writableState.defaultEncoding = encoding;
return this;
};
function writeOrBuffer(stream, state, chunk, encoding, callback) {
const len = state.objectMode ? 1 : chunk.length;
state.length += len;
const ret = state.length < state.highWaterMark;
if (!ret) {
state.needDrain = true;
}
if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== "buffer") {
state.allBuffers = false;
}
if (state.allNoop && callback !== nop) {
state.allNoop = false;
}
} else {
state.writelen = len;
state.writecb = callback;
state.writing = true;
state.sync = true;
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
return ret && !state.errored && !state.destroyed;
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
if (state.destroyed) {
state.onwrite(new ERR_STREAM_DESTROYED("write"));
} else if (writev) {
stream._writev(chunk, state.onwrite);
} else {
stream._write(chunk, encoding, state.onwrite);
}
state.sync = false;
}
function onwriteError(stream, state, er, cb) {
--state.pendingcb;
cb(er);
errorBuffer(state);
errorOrDestroy(stream, er);
}
function onwrite(stream, er) {
const state = stream._writableState;
const sync = state.sync;
const cb = state.writecb;
if (typeof cb !== "function") {
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
return;
}
state.writing = false;
state.writecb = null;
state.length -= state.writelen;
state.writelen = 0;
if (er) {
er.stack;
if (!state.errored) {
state.errored = er;
}
if (stream._readableState && !stream._readableState.errored) {
stream._readableState.errored = er;
}
if (sync) {
nextTick(onwriteError, stream, state, er, cb);
} else {
onwriteError(stream, state, er, cb);
}
} else {
if (state.buffered.length > state.bufferedIndex) {
clearBuffer(stream, state);
}
if (sync) {
if (
state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb
) {
state.afterWriteTickInfo.count++;
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
nextTick(afterWriteTick, state.afterWriteTickInfo);
}
} else {
afterWrite(stream, state, 1, cb);
}
}
}
function afterWriteTick({ stream, state, count, cb }) {
state.afterWriteTickInfo = null;
return afterWrite(stream, state, count, cb);
}
function afterWrite(stream, state, count, cb) {
const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
state.needDrain;
if (needDrain) {
state.needDrain = false;
stream.emit("drain");
}
while (count-- > 0) {
state.pendingcb--;
cb(null);
}
if (state.destroyed) {
errorBuffer(state);
}
finishMaybe(stream, state);
}
function errorBuffer(state) {
if (state.writing) {
return;
}
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state.buffered[n];
const len = state.objectMode ? 1 : chunk.length;
state.length -= len;
callback(state.errored ?? new ERR_STREAM_DESTROYED("write"));
}
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED("end"));
}
resetBuffer(state);
}
function clearBuffer(stream, state) {
if (
state.corked ||
state.bufferProcessing ||
state.destroyed ||
!state.constructed
) {
return;
}
const { buffered, bufferedIndex, objectMode } = state;
const bufferedLength = buffered.length - bufferedIndex;
if (!bufferedLength) {
return;
}
let i = bufferedIndex;
state.bufferProcessing = true;
if (bufferedLength > 1 && stream._writev) {
state.pendingcb -= bufferedLength - 1;
const callback = state.allNoop ? nop : (err) => {
for (let n = i; n < buffered.length; ++n) {
buffered[n].callback(err);
}
};
const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
chunks.allBuffers = state.allBuffers;
doWrite(stream, state, true, state.length, chunks, "", callback);
resetBuffer(state);
} else {
do {
const { chunk, encoding, callback } = buffered[i];
buffered[i++] = null;
const len = objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, callback);
} while (i < buffered.length && !state.writing);
if (i === buffered.length) {
resetBuffer(state);
} else if (i > 256) {
buffered.splice(0, i);
state.bufferedIndex = 0;
} else {
state.bufferedIndex = i;
}
}
state.bufferProcessing = false;
}
Writable.prototype._write = function (chunk, encoding, cb) {
if (this._writev) {
this._writev([{ chunk, encoding }], cb);
} else {
throw new ERR_METHOD_NOT_IMPLEMENTED("_write()");
}
};
Writable.prototype._writev = null;
Writable.prototype.end = function (chunk, encoding, cb) {
const state = this._writableState;
if (typeof chunk === "function") {
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === "function") {
cb = encoding;
encoding = null;
}
let err;
if (chunk !== null && chunk !== undefined) {
const ret = _write(this, chunk, encoding);
if (ret instanceof Error) {
err = ret;
}
}
if (state.corked) {
state.corked = 1;
this.uncork();
}
if (err) {
} else if (!state.errored && !state.ending) {
state.ending = true;
finishMaybe(this, state, true);
state.ended = true;
} else if (state.finished) {
err = new ERR_STREAM_ALREADY_FINISHED("end");
} else if (state.destroyed) {
err = new ERR_STREAM_DESTROYED("end");
}
if (typeof cb === "function") {
if (err || state.finished) {
nextTick(cb, err);
} else if (state.errored) {
nextTick(cb, state.errored);
} else {
state[kOnFinished].push(cb);
}
}
return this;
};
function needFinish(state) {
return (state.ending &&
state.constructed &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0 &&
!state.finished &&
!state.writing &&
!state.errorEmitted &&
!state.closeEmitted);
}
function callFinal(stream, state) {
let called = false;
function onFinish(err) {
if (called) {
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
return;
}
called = true;
state.pendingcb--;
if (err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err);
}
errorOrDestroy(stream, err, state.sync);
} else if (needFinish(state)) {
state.prefinished = true;
stream.emit("prefinish");
state.pendingcb++;
nextTick(finish, stream, state);
}
}
state.sync = true;
state.pendingcb++;
try {
const result = stream._final(onFinish);
if (result != null) {
const then = result.then;
if (typeof then === "function") {
then.call(
result,
function () {
nextTick(onFinish, null);
},
function (err) {
nextTick(onFinish, err);
},
);
}
}
} catch (err) {
onFinish(err);
}
state.sync = false;
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === "function" && !state.destroyed) {
state.finalCalled = true;
callFinal(stream, state);
} else {
state.prefinished = true;
stream.emit("prefinish");
}
}
}
function finishMaybe(stream, state, sync) {
if (needFinish(state)) {
prefinish(stream, state);
if (state.pendingcb === 0 && needFinish(state)) {
state.pendingcb++;
if (sync) {
nextTick(finish, stream, state);
} else {
finish(stream, state);
}
}
}
}
function finish(stream, state) {
state.pendingcb--;
state.finished = true;
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](null);
}
stream.emit("finish");
if (state.autoDestroy) {
const rState = stream._readableState;
const autoDestroy = !rState || (
rState.autoDestroy &&
(rState.endEmitted || rState.readable === false)
);
if (autoDestroy) {
stream.destroy();
}
}
}
Object.defineProperties(Writable.prototype, {
destroyed: {
get() {
return this._writableState ? this._writableState.destroyed : false;
},
set(value) {
if (this._writableState) {
this._writableState.destroyed = value;
}
},
},
writable: {
get() {
const w = this._writableState;
return !!w && w.writable !== false && !w.destroyed && !w.errored &&
!w.ending && !w.ended;
},
set(val) {
if (this._writableState) {
this._writableState.writable = !!val;
}
},
},
writableFinished: {
get() {
return this._writableState ? this._writableState.finished : false;
},
},
writableObjectMode: {
get() {
return this._writableState ? this._writableState.objectMode : false;
},
},
writableBuffer: {
get() {
return this._writableState && this._writableState.getBuffer();
},
},
writableEnded: {
get() {
return this._writableState ? this._writableState.ending : false;
},
},
writableNeedDrain: {
get() {
const wState = this._writableState;
if (!wState) return false;
return !wState.destroyed && !wState.ending && wState.needDrain;
},
},
writableHighWaterMark: {
get() {
return this._writableState && this._writableState.highWaterMark;
},
},
writableCorked: {
get() {
return this._writableState ? this._writableState.corked : 0;
},
},
writableLength: {
get() {
return this._writableState && this._writableState.length;
},
},
writableAborted: {
enumerable: false,
get: function () {
return !!(this._writableState.writable !== false &&
(this._writableState.destroyed || this._writableState.errored) &&
!this._writableState.finished);
},
},
errored: {
enumerable: false,
get() {
return this._writableState ? this._writableState.errored : null;
},
},
closed: {
get() {
return this._writableState ? this._writableState.closed : false;
},
},
});
const destroy = destroyImpl.destroy;
Writable.prototype.destroy = function (err, cb) {
const state = this._writableState;
if (
!state.destroyed &&
(state.bufferedIndex < state.buffered.length ||
state[kOnFinished].length)
) {
nextTick(errorBuffer, state);
}
destroy.call(this, err, cb);
return this;
};
Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function (err, cb) {
cb(err);
};
Writable.prototype[EventEmitter.captureRejectionSymbol] = function (err) {
this.destroy(err);
};
function newStreamWritableFromWritableStream(writableStream, options = {}) {
if (!(writableStream instanceof WritableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'writableStream',
'WritableStream',
writableStream);
}
validateObject(options, 'options');
const {
highWaterMark,
decodeStrings = true,
objectMode = false,
signal,
} = options;
validateBoolean(objectMode, 'options.objectMode');
validateBoolean(decodeStrings, 'options.decodeStrings');
const writer = writableStream.getWriter();
let closed = false;
const writable = new Writable({
highWaterMark,
objectMode,
decodeStrings,
signal,
write(chunk, encoding, callback) {
if (typeof chunk === 'string' && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroyImpl.destroyer(writable, error);
}
}
writer.ready.then(
() => {
return writer.write(chunk).then(done, done);
},
done);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
nextTick(() => { throw error; });
}
}
if (!closed) {
if (error != null) {
writer.abort(error).then(done, done);
} else {
writer.close().then(done, done);
}
return;
}
done();
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
nextTick(() => { destroyImpl.destroyer(writable, error); });
}
}
if (!closed) {
writer.close().then(done, done);
}
},
});
writer.closed.then(
() => {
closed = true;
if (!isWritableEnded(writable))
destroyImpl.destroyer(writable, new ERR_STREAM_PREMATURE_CLOSE());
},
(error) => {
closed = true;
destroyImpl.destroyer(writable, error);
});
return writable;
}
function newWritableStreamFromStreamWritable(streamWritable) {
if (typeof streamWritable?.write !== 'function' ||
typeof streamWritable?.on !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'streamWritable',
'stream.Writable',
streamWritable);
}
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
const writable = new WritableStream();
writable.close();
return writable;
}
const highWaterMark = streamWritable.writableHighWaterMark;
const strategy = streamWritable.writableObjectMode
? new CountQueuingStrategy({ highWaterMark })
: { highWaterMark };
let controller;
let backpressureResolve;
let backpressureReject;
function onDrain() {
if (backpressureResolve !== undefined) {
backpressureResolve();
backpressureResolve = undefined;
backpressureReject = undefined;
}
}
const cleanup = eos(streamWritable, (error) => {
cleanup();
streamWritable.on('error', () => {});
if (error != null) {
if (backpressureReject !== undefined) {
backpressureReject(error);
backpressureResolve = undefined;
backpressureReject = undefined;
}
controller.error(error);
controller = undefined;
return;
}
controller.error(new AbortError());
controller = undefined;
});
streamWritable.on('drain', onDrain);
return new WritableStream({
start(c) { controller = c; },
write(chunk) {
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
return new Promise((resolve, reject) => {
backpressureResolve = resolve;
backpressureReject = reject;
}).finally(() => {
backpressureResolve = undefined;
backpressureReject = undefined;
});
}
},
abort(reason) {
destroyImpl.destroyer(streamWritable, reason);
},
close() {
if (!isWritableEnded(streamWritable)) {
return new Promise((resolve, reject) => {
streamWritable.end();
eos(streamWritable, (err) => {
if (err) reject(err);
else resolve();
});
});
}
controller = undefined;
return Promise.resolve();
},
}, strategy);
}
Writable.fromWeb = function(writableStream, options) {
return newStreamWritableFromWritableStream(writableStream, options);
};
Writable.toWeb = function(streamWritable) {
return newWritableStreamFromStreamWritable(streamWritable);
};
Writable.WritableState = WritableState;
if (typeof Symbol.asyncDispose !== 'undefined') {
Writable.prototype[Symbol.asyncDispose] = async function() {
let error;
if (!this.destroyed) {
error = this.writableFinished ? null : new AbortError();
this.destroy(error);
}
await new Promise((resolve, reject) =>
eos(this, (err) => (err && err.name !== 'AbortError' ? reject(err) : resolve(null))),
);
};
}
export default Writable;
export { Writable, WritableState, newStreamWritableFromWritableStream as fromWeb, newWritableStreamFromStreamWritable as toWeb };