import { addAbortSignalNoValidate } from "__wasm_rquickjs_builtin/internal/streams/add-abort-signal";
import { Buffer } from "buffer";
import { debuglog } from "__wasm_rquickjs_builtin/internal/util/debuglog";
import { getDefaultHighWaterMark, getHighWaterMark } from "__wasm_rquickjs_builtin/internal/streams/state";
import { prependListener, Stream } from "__wasm_rquickjs_builtin/internal/streams/legacy";
import { StringDecoder } from "string_decoder";
import { validateObject, validateBoolean, validateAbortSignal, validateInteger } from "__wasm_rquickjs_builtin/internal/validators";
import {
AbortError,
aggregateTwoErrors,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_OUT_OF_RANGE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
ERR_UNKNOWN_ENCODING,
} from "__wasm_rquickjs_builtin/internal/errors";
import _from from "__wasm_rquickjs_builtin/internal/streams/from";
import BufferList from "__wasm_rquickjs_builtin/internal/streams/buffer_list";
import destroyImpl from "__wasm_rquickjs_builtin/internal/streams/destroy";
import EventEmitter from "events";
import { nextTick } from "node:process";
import { isDestroyed, isReadable } from "__wasm_rquickjs_builtin/internal/streams/utils";
import eos from "__wasm_rquickjs_builtin/internal/streams/end-of-stream";
let debug = debuglog("stream", (fn) => {
debug = fn;
});
const kPaused = Symbol("kPaused");
const nop = () => { };
const { errorOrDestroy } = destroyImpl;
function ReadableState(options, stream, isDuplex) {
if (typeof isDuplex !== "boolean") {
isDuplex = stream instanceof Stream.Duplex;
}
this.objectMode = !!(options && options.objectMode);
if (isDuplex) {
this.objectMode = this.objectMode ||
!!(options && options.readableObjectMode);
}
this.highWaterMark = options
? getHighWaterMark(this, options, "readableHighWaterMark", isDuplex)
: getDefaultHighWaterMark(false);
this.buffer = new BufferList();
this.length = 0;
this.pipes = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;
this.constructed = true;
this.sync = true;
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this[kPaused] = null;
this.errorEmitted = false;
this.emitClose = !options || options.emitClose !== false;
this.autoDestroy = !options || options.autoDestroy !== false;
this.destroyed = false;
this.errored = null;
this.closed = false;
this.closeEmitted = false;
this.defaultEncoding = (options && options.defaultEncoding) || "utf8";
if (!Buffer.isEncoding(this.defaultEncoding)) {
throw new ERR_UNKNOWN_ENCODING(this.defaultEncoding);
}
this.awaitDrainWriters = null;
this.multiAwaitDrain = false;
this.readingMore = false;
this.dataEmitted = false;
this.decoder = null;
this.encoding = null;
if (options && options.encoding) {
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding;
}
}
function Readable(options) {
if (!(this instanceof Readable)) {
return new Readable(options);
}
if (!this._events) {
this._events = Object.create(null);
this._events.close = undefined;
this._events.error = undefined;
this._events.data = undefined;
this._events.end = undefined;
this._events.readable = undefined;
this._eventsCount = 0;
}
const isDuplex = this instanceof Stream.Duplex;
this._readableState = new ReadableState(options, this, isDuplex);
if (options) {
if (typeof options.read === "function") {
this._read = options.read;
}
if (typeof options.destroy === "function") {
this._destroy = options.destroy;
}
if (typeof options.construct === "function") {
this._construct = options.construct;
}
}
Stream.call(this, options);
if (options && options.signal && !isDuplex) {
addAbortSignalNoValidate(options.signal, this);
}
destroyImpl.construct(this, () => {
if (this._readableState.needReadable) {
maybeReadMore(this, this._readableState);
}
});
}
Object.setPrototypeOf(Readable.prototype, Stream.prototype);
Object.setPrototypeOf(Readable, Stream);
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function (err, cb) {
cb(err);
};
Readable.prototype[EventEmitter.captureRejectionSymbol] = function (err) {
this.destroy(err);
};
Readable.prototype.push = function (chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false);
};
Readable.prototype.unshift = function (chunk, encoding) {
return readableAddChunk(this, chunk, encoding, true);
};
function readableAddChunk(stream, chunk, encoding, addToFront) {
debug("readableAddChunk", chunk);
const state = stream._readableState;
let err;
if (!state.objectMode) {
if (typeof chunk === "string") {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
if (addToFront && state.encoding) {
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
encoding = "";
}
}
} else if (chunk instanceof Buffer) {
encoding = "";
} else if (ArrayBuffer.isView(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = "";
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE(
"chunk",
["string", "Buffer", "TypedArray", "DataView"],
chunk,
);
}
}
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
} else if (state.objectMode || (chunk && chunk.length > 0)) {
if (addToFront) {
if (state.endEmitted) {
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
} else {
addChunk(stream, state, chunk, true);
}
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed || state.errored) {
return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0) {
addChunk(stream, state, chunk, false);
} else {
maybeReadMore(stream, state);
}
} else {
addChunk(stream, state, chunk, false);
}
}
} else if (!addToFront) {
state.reading = false;
maybeReadMore(stream, state);
}
return !state.ended &&
(state.length < state.highWaterMark || state.length === 0);
}
function addChunk(stream, state, chunk, addToFront) {
if (
state.flowing && state.length === 0 && !state.sync &&
stream.listenerCount("data") > 0
) {
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
state.dataEmitted = true;
stream.emit("data", chunk);
} else {
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront) {
state.buffer.unshift(chunk);
} else {
state.buffer.push(chunk);
}
if (state.needReadable) {
emitReadable(stream);
}
}
maybeReadMore(stream, state);
}
Readable.prototype.isPaused = function () {
const state = this._readableState;
return state[kPaused] === true || state.flowing === false;
};
Readable.prototype.setEncoding = function (enc) {
const decoder = new StringDecoder(enc);
this._readableState.decoder = decoder;
this._readableState.encoding = this._readableState.decoder.encoding;
const buffer = this._readableState.buffer;
let content = "";
for (const data of buffer) {
content += decoder.write(data);
}
buffer.clear();
if (content !== "") {
buffer.push(content);
}
this._readableState.length = content.length;
return this;
};
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
if (n > MAX_HWM) {
throw new ERR_OUT_OF_RANGE('size', '<= 1GiB', n);
} else if (n === MAX_HWM) {
return MAX_HWM;
} else {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
}
return n;
}
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended)) {
return 0;
}
if (state.objectMode) {
return 1;
}
if (Number.isNaN(n)) {
if (state.flowing && state.length) {
return state.buffer.first().length;
}
return state.length;
}
if (n <= state.length) {
return n;
}
return state.ended ? state.length : 0;
}
Readable.prototype.read = function (n) {
debug("read", n);
if (n === undefined) {
n = NaN;
} else if (!Number.isInteger(n)) {
n = Number.parseInt(n, 10);
}
const state = this._readableState;
const nOrig = n;
if (n > state.highWaterMark) {
state.highWaterMark = computeNewHighWaterMark(n);
}
if (n !== 0) {
state.emittedReadable = false;
}
if (
n === 0 &&
state.needReadable &&
((state.highWaterMark !== 0
? state.length >= state.highWaterMark
: state.length > 0) ||
state.ended)
) {
debug("read: emitReadable", state.length, state.ended);
if (state.length === 0 && state.ended) {
endReadable(this);
} else {
emitReadable(this);
}
return null;
}
n = howMuchToRead(n, state);
if (n === 0 && state.ended) {
if (state.length === 0) {
endReadable(this);
}
return null;
}
let doRead = state.needReadable;
debug("need readable", doRead);
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug("length less than watermark", doRead);
}
if (
state.ended || state.reading || state.destroyed || state.errored ||
!state.constructed
) {
doRead = false;
debug("reading, ended or constructing", doRead);
} else if (doRead) {
debug("do read");
state.reading = true;
state.sync = true;
if (state.length === 0) {
state.needReadable = true;
}
try {
this._read(state.highWaterMark);
} catch (err) {
errorOrDestroy(this, err);
}
state.sync = false;
if (!state.reading) {
n = howMuchToRead(nOrig, state);
}
}
let ret;
if (n > 0) {
ret = fromList(n, state);
} else {
ret = null;
}
if (ret === null) {
state.needReadable = state.length <= state.highWaterMark;
n = 0;
} else {
state.length -= n;
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
}
if (state.length === 0) {
if (!state.ended) {
state.needReadable = true;
}
if (nOrig !== n && state.ended) {
endReadable(this);
}
}
if (ret !== null) {
state.dataEmitted = true;
this.emit("data", ret);
}
return ret;
};
function onEofChunk(stream, state) {
debug("onEofChunk");
if (state.ended) return;
if (state.decoder) {
const chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += state.objectMode ? 1 : chunk.length;
}
}
state.ended = true;
if (state.sync) {
emitReadable(stream);
} else {
state.needReadable = false;
state.emittedReadable = true;
emitReadable_(stream);
}
}
function emitReadable(stream) {
const state = stream._readableState;
debug("emitReadable", state.needReadable, state.emittedReadable);
state.needReadable = false;
if (!state.emittedReadable) {
debug("emitReadable", state.flowing);
state.emittedReadable = true;
nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
const state = stream._readableState;
debug("emitReadable_", state.destroyed, state.length, state.ended);
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
stream.emit("readable");
state.emittedReadable = false;
}
state.needReadable = !state.flowing &&
!state.ended &&
state.length <= state.highWaterMark;
flow(stream);
}
function maybeReadMore(stream, state) {
if (!state.readingMore && state.constructed) {
state.readingMore = true;
nextTick(maybeReadMore_, stream, state);
}
}
function maybeReadMore_(stream, state) {
while (
!state.reading && !state.ended &&
(state.length < state.highWaterMark ||
(state.flowing && state.length === 0))
) {
const len = state.length;
debug("maybeReadMore read 0");
stream.read(0);
if (len === state.length) {
break;
}
}
state.readingMore = false;
}
Readable.prototype._read = function (n) {
throw new ERR_METHOD_NOT_IMPLEMENTED("_read()");
};
Readable.prototype.pipe = function (dest, pipeOpts) {
const src = this;
const state = this._readableState;
if (state.pipes.length === 1) {
if (!state.multiAwaitDrain) {
state.multiAwaitDrain = true;
state.awaitDrainWriters = new Set(
state.awaitDrainWriters ? [state.awaitDrainWriters] : [],
);
}
}
state.pipes.push(dest);
debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts);
const doEnd = (!pipeOpts || pipeOpts.end !== false)
const endFn = doEnd ? onend : unpipe;
if (state.endEmitted) {
nextTick(endFn);
} else {
src.once("end", endFn);
}
dest.on("unpipe", onunpipe);
function onunpipe(readable, unpipeInfo) {
debug("onunpipe");
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
cleanup();
}
}
}
function onend() {
debug("onend");
dest.end();
}
let ondrain;
let cleanedUp = false;
function cleanup() {
debug("cleanup");
dest.removeListener("close", onclose);
dest.removeListener("finish", onfinish);
if (ondrain) {
dest.removeListener("drain", ondrain);
}
dest.removeListener("error", onerror);
dest.removeListener("unpipe", onunpipe);
src.removeListener("end", onend);
src.removeListener("end", unpipe);
src.removeListener("data", ondata);
cleanedUp = true;
if (
ondrain && state.awaitDrainWriters &&
(!dest._writableState || dest._writableState.needDrain)
) {
ondrain();
}
}
function pause() {
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug("false write response, pause", 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug("false write response, pause", state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
ondrain = pipeOnDrain(src, dest);
dest.on("drain", ondrain);
}
}
src.on("data", ondata);
function ondata(chunk) {
debug("ondata");
const ret = dest.write(chunk);
debug("dest.write", ret);
if (ret === false) {
pause();
}
}
function onerror(er) {
debug("onerror", er);
unpipe();
dest.removeListener("error", onerror);
if (EventEmitter.listenerCount(dest, "error") === 0) {
const s = dest._writableState || dest._readableState;
if (s && !s.errorEmitted) {
errorOrDestroy(dest, er);
} else {
dest.emit("error", er);
}
}
}
prependListener(dest, "error", onerror);
function onclose() {
dest.removeListener("finish", onfinish);
unpipe();
}
dest.once("close", onclose);
function onfinish() {
debug("onfinish");
dest.removeListener("close", onclose);
unpipe();
}
dest.once("finish", onfinish);
function unpipe() {
debug("unpipe");
src.unpipe(dest);
}
dest.emit("pipe", src);
if (dest.writableNeedDrain === true) {
pause();
} else if (!state.flowing) {
debug("pipe resume");
src.resume();
}
return dest;
};
function pipeOnDrain(src, dest) {
return function pipeOnDrainFunctionResult() {
const state = src._readableState;
if (state.awaitDrainWriters === dest) {
debug("pipeOnDrain", 1);
state.awaitDrainWriters = null;
} else if (state.multiAwaitDrain) {
debug("pipeOnDrain", state.awaitDrainWriters.size);
state.awaitDrainWriters.delete(dest);
}
if (
(!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
EventEmitter.listenerCount(src, "data")
) {
state.flowing = true;
flow(src);
}
};
}
Readable.prototype.unpipe = function (dest) {
const state = this._readableState;
const unpipeInfo = { hasUnpiped: false };
if (state.pipes.length === 0) {
return this;
}
if (!dest) {
const dests = state.pipes;
state.pipes = [];
this.pause();
for (let i = 0; i < dests.length; i++) {
dests[i].emit("unpipe", this, { hasUnpiped: false });
}
return this;
}
const index = state.pipes.indexOf(dest);
if (index === -1) {
return this;
}
state.pipes.splice(index, 1);
if (state.pipes.length === 0) {
this.pause();
}
dest.emit("unpipe", this, unpipeInfo);
return this;
};
Readable.prototype.on = function (ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn);
const state = this._readableState;
if (ev === "data") {
state.readableListening = this.listenerCount("readable") > 0;
if (state.flowing !== false) {
this.resume();
}
} else if (ev === "readable") {
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.flowing = false;
state.emittedReadable = false;
debug("on readable", state.length, state.reading);
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
nextTick(nReadingNextTick, this);
}
}
}
return res;
};
Readable.prototype.addListener = Readable.prototype.on;
Readable.prototype.once = function (ev, fn) {
const onceWrapper = (...args) => {
this.removeListener(ev, onceWrapper);
fn.apply(this, args);
};
onceWrapper.listener = fn;
return this.on(ev, onceWrapper);
};
Readable.prototype.removeListener = function (ev, fn) {
const res = Stream.prototype.removeListener.call(this, ev, fn);
if (ev === "readable") {
nextTick(updateReadableListening, this);
}
return res;
};
Readable.prototype.off = Readable.prototype.removeListener;
Readable.prototype.removeAllListeners = function (ev) {
const res = Stream.prototype.removeAllListeners.apply(this, arguments);
if (ev === "readable" || ev === undefined) {
nextTick(updateReadableListening, this);
}
return res;
};
function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount("readable") > 0;
if (state.resumeScheduled && state[kPaused] === false) {
state.flowing = true;
} else if (self.listenerCount("data") > 0) {
self.resume();
} else if (!state.readableListening) {
state.flowing = null;
}
}
function nReadingNextTick(self) {
debug("readable nexttick read 0");
self.read(0);
}
Readable.prototype.resume = function () {
const state = this._readableState;
if (!state.flowing) {
debug("resume");
state.flowing = !state.readableListening;
resume(this, state);
}
state[kPaused] = false;
return this;
};
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
nextTick(resume_, stream, state);
}
}
function resume_(stream, state) {
debug("resume", state.reading);
if (!state.reading) {
stream.read(0);
}
state.resumeScheduled = false;
stream.emit("resume");
flow(stream);
if (state.flowing && !state.reading) {
stream.read(0);
}
}
Readable.prototype.pause = function () {
debug("call pause flowing=%j", this._readableState.flowing);
if (this._readableState.flowing !== false) {
debug("pause");
this._readableState.flowing = false;
this.emit("pause");
}
this._readableState[kPaused] = true;
return this;
};
function flow(stream) {
const state = stream._readableState;
debug("flow", state.flowing);
while (state.flowing && stream.read() !== null);
}
Readable.prototype.wrap = function (stream) {
let paused = false;
stream.on("data", (chunk) => {
if (!this.push(chunk) && stream.pause) {
paused = true;
stream.pause();
}
});
stream.on("end", () => {
this.push(null);
});
stream.on("error", (err) => {
errorOrDestroy(this, err);
});
stream.on("close", () => {
this.destroy();
});
stream.on("destroy", () => {
this.destroy();
});
this._read = () => {
if (paused && stream.resume) {
paused = false;
stream.resume();
}
};
const streamKeys = Object.keys(stream);
for (let j = 1; j < streamKeys.length; j++) {
const i = streamKeys[j];
if (this[i] === undefined && typeof stream[i] === "function") {
this[i] = stream[i].bind(stream);
}
}
return this;
};
Readable.prototype[Symbol.asyncIterator] = function () {
return streamToAsyncIterator(this);
};
Readable.prototype.iterator = function (options) {
if (options !== undefined) {
validateObject(options, "options");
}
return streamToAsyncIterator(this, options);
};
function streamToAsyncIterator(stream, options) {
if (typeof stream.read !== "function") {
stream = Readable.wrap(stream, { objectMode: true });
}
const opts = {
destroyOnReturn: true,
destroyOnError: true,
...options,
};
const gen = createAsyncIterator(stream, opts);
const iter = {
stream: stream,
next: gen.next.bind(gen),
return: function () {
if (opts.destroyOnReturn) {
const state = stream._readableState;
if (state && !state.destroyed) {
if (state.autoDestroy || !state.endEmitted) {
state.destroyed = true;
}
}
}
return gen.return();
},
throw: gen.throw.bind(gen),
[Symbol.asyncIterator]() { return this; },
};
return iter;
}
async function* createAsyncIterator(stream, opts) {
let callback = nop;
function next(resolve) {
if (this === stream) {
callback();
callback = nop;
} else {
callback = resolve;
}
}
stream.on("readable", next);
let error;
const cleanup = eos(stream, { writable: false }, (err) => {
error = err ? aggregateTwoErrors(error, err) : null;
callback();
callback = nop;
});
try {
while (true) {
const chunk = stream.destroyed ? null : stream.read();
if (chunk !== null) {
yield chunk;
} else if (error) {
throw error;
} else if (error === null) {
return;
} else {
await new Promise(next);
}
}
} catch (err) {
if (opts.destroyOnError) {
destroyImpl.destroyer(stream, err);
}
error = aggregateTwoErrors(error, err);
throw error;
} finally {
if (error === undefined && opts.destroyOnReturn) {
const state = stream._readableState;
if (state.autoDestroy || !state.endEmitted) {
destroyImpl.destroyer(stream, null);
}
}
stream.off("readable", next);
cleanup();
}
}
Object.defineProperties(Readable.prototype, {
readable: {
get() {
const r = this._readableState;
return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted &&
!r.endEmitted;
},
set(val) {
if (this._readableState) {
this._readableState.readable = !!val;
}
},
},
readableDidRead: {
enumerable: false,
get: function () {
return this._readableState.dataEmitted;
},
},
readableAborted: {
enumerable: false,
get: function () {
return !!(this._readableState.destroyed || this._readableState.errored) &&
!this._readableState.endEmitted;
},
},
readableHighWaterMark: {
enumerable: false,
get: function () {
return this._readableState.highWaterMark;
},
},
readableBuffer: {
enumerable: false,
get: function () {
return this._readableState && this._readableState.buffer;
},
},
readableFlowing: {
enumerable: false,
get: function () {
return this._readableState.flowing;
},
set: function (state) {
if (this._readableState) {
this._readableState.flowing = state;
}
},
},
readableLength: {
enumerable: false,
get() {
return this._readableState.length;
},
},
readableObjectMode: {
enumerable: false,
get() {
return this._readableState ? this._readableState.objectMode : false;
},
},
readableEncoding: {
enumerable: false,
get() {
return this._readableState ? this._readableState.encoding : null;
},
},
destroyed: {
enumerable: false,
get() {
if (this._readableState === undefined) {
return false;
}
return this._readableState.destroyed;
},
set(value) {
if (!this._readableState) {
return;
}
this._readableState.destroyed = value;
},
},
readableEnded: {
enumerable: false,
get() {
return this._readableState ? this._readableState.endEmitted : false;
},
},
errored: {
enumerable: false,
get() {
return this._readableState ? this._readableState.errored : null;
},
},
closed: {
get() {
return this._readableState ? this._readableState.closed : false;
},
},
});
Object.defineProperties(ReadableState.prototype, {
pipesCount: {
get() {
return this.pipes.length;
},
},
paused: {
get() {
return this[kPaused] !== false;
},
set(value) {
this[kPaused] = !!value;
},
},
});
function fromList(n, state) {
if (state.length === 0) {
return null;
}
let ret;
if (state.objectMode) {
ret = state.buffer.shift();
} else if (!n || n >= state.length) {
if (state.decoder) {
ret = state.buffer.join("");
} else if (state.buffer.length === 1) {
ret = state.buffer.first();
} else {
ret = state.buffer.concat(state.length);
}
state.buffer.clear();
} else {
ret = state.buffer.consume(n, state.decoder);
}
return ret;
}
function endReadable(stream) {
const state = stream._readableState;
debug("endReadable", state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
nextTick(endReadableNT, state, stream);
}
}
function endReadableNT(state, stream) {
debug("endReadableNT", state.endEmitted, state.length);
if (
!state.errorEmitted && !state.closeEmitted &&
!state.endEmitted && state.length === 0
) {
state.endEmitted = true;
stream.emit("end");
if (stream.writable && stream.allowHalfOpen === false) {
nextTick(endWritableNT, stream);
} else if (state.autoDestroy) {
const wState = stream._writableState;
const autoDestroy = !wState || (
wState.autoDestroy &&
(wState.finished || wState.writable === false)
);
if (autoDestroy) {
stream.destroy();
}
}
}
}
function endWritableNT(stream) {
const writable = stream.writable && !stream.writableEnded &&
!stream.destroyed;
if (writable) {
stream.end();
}
}
function readableFrom(iterable, opts) {
return _from(Readable, iterable, opts);
}
function wrap(src, options) {
return new Readable({
objectMode: src.readableObjectMode ?? src.objectMode ?? true,
...options,
destroy(err, callback) {
destroyImpl.destroyer(src, err);
callback(err);
},
}).wrap(src);
}
function newStreamReadableFromReadableStream(readableStream, options = {}) {
if (!(readableStream instanceof ReadableStream)) {
throw new ERR_INVALID_ARG_TYPE(
'readableStream',
'ReadableStream',
readableStream);
}
validateObject(options, 'options');
const {
highWaterMark,
encoding,
objectMode = false,
signal,
} = options;
if (encoding !== undefined && !Buffer.isEncoding(encoding))
throw new ERR_INVALID_ARG_VALUE('options.encoding', encoding);
validateBoolean(objectMode, 'options.objectMode');
const reader = readableStream.getReader();
let closed = false;
const readable = new Readable({
objectMode,
highWaterMark,
encoding,
signal,
read() {
reader.read().then(
(chunk) => {
if (chunk.done) {
readable.push(null);
} else {
readable.push(chunk.value);
}
},
(error) => destroyImpl.destroyer(readable, error));
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
nextTick(() => { throw error; });
}
}
if (!closed) {
reader.cancel(error).then(done, done);
return;
}
done();
},
});
reader.closed.then(
() => {
closed = true;
},
(error) => {
closed = true;
destroyImpl.destroyer(readable, error);
});
return readable;
}
function newReadableStreamFromStreamReadable(streamReadable, options = {}) {
if (typeof streamReadable?._readableState !== 'object') {
throw new ERR_INVALID_ARG_TYPE(
'streamReadable',
'stream.Readable',
streamReadable);
}
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
const readable = new ReadableStream();
readable.cancel();
return readable;
}
const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;
const strategy = objectMode
? new CountQueuingStrategy({ highWaterMark })
: { highWaterMark };
let controller;
let wasCanceled = false;
function onData(chunk) {
if (Buffer.isBuffer(chunk) && !objectMode)
chunk = new Uint8Array(chunk);
controller.enqueue(chunk);
if (controller.desiredSize <= 0)
streamReadable.pause();
}
streamReadable.pause();
const cleanup = eos(streamReadable, (error) => {
cleanup();
streamReadable.on('error', () => {});
if (error)
return controller.error(error);
if (wasCanceled) {
return;
}
controller.close();
});
streamReadable.on('data', onData);
return new ReadableStream({
start(c) { controller = c; },
pull() { streamReadable.resume(); },
cancel(reason) {
wasCanceled = true;
destroyImpl.destroyer(streamReadable, reason);
},
}, strategy);
}
function throwIfAborted(signal) {
if (signal?.aborted) throw new AbortError(signal.reason);
}
function validateConcurrency(options) {
let concurrency = 1;
if (options?.concurrency != null) {
concurrency = Math.floor(options.concurrency);
}
validateInteger(concurrency, 'options.concurrency', 1);
return concurrency;
}
Readable.prototype.map = function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const concurrency = validateConcurrency(options);
const signal = options?.signal;
let outHWM = concurrency - 1;
if (options?.highWaterMark != null) {
outHWM = Math.floor(options.highWaterMark);
}
validateInteger(outHWM, 'options.highWaterMark', 0);
const queueHWM = outHWM + concurrency;
const src = this;
return readableFrom((async function* () {
throwIfAborted(signal);
const ac = new AbortController();
let cleanupSignal;
if (signal) {
const onAbort = () => { ac.abort(signal.reason); };
if (signal.aborted) { ac.abort(signal.reason); }
else {
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
}
const combinedSignal = ac.signal;
try {
const queue = [];
let sourceDone = false;
let inFlight = 0;
let errorOccurred = null;
let resolveNext = null;
let pumpScheduled = false;
let pumping = false;
let repumpRequested = false;
const it = src[Symbol.asyncIterator]();
function onSlotReady() {
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
}
function schedulePump() {
if (pumping) {
repumpRequested = true;
return;
}
if (!pumpScheduled) {
pumpScheduled = true;
Promise.resolve().then(() => {
pumpScheduled = false;
pump();
});
}
}
async function pump() {
if (pumping) {
repumpRequested = true;
return;
}
pumping = true;
try {
while (!sourceDone && !combinedSignal.aborted && !errorOccurred && inFlight < concurrency && queue.length < queueHWM) {
let step;
try {
step = await it.next();
} catch (e) {
errorOccurred = e;
onSlotReady();
return;
}
if (step.done) {
sourceDone = true;
onSlotReady();
return;
}
const slot = { resolved: false, value: undefined, error: null };
queue.push(slot);
inFlight++;
const chunk = step.value;
let mapped;
try {
if (combinedSignal.aborted) throw new AbortError(combinedSignal.reason);
mapped = Promise.resolve(fn(chunk, { signal: combinedSignal }));
} catch (e) {
mapped = Promise.reject(e);
}
mapped.then(
(result) => {
slot.value = result;
},
(e) => {
slot.error = e;
if (!errorOccurred) errorOccurred = e;
},
).finally(() => {
slot.resolved = true;
inFlight--;
onSlotReady();
schedulePump();
});
}
} finally {
pumping = false;
if (repumpRequested) {
repumpRequested = false;
schedulePump();
}
}
}
schedulePump();
while (true) {
while (queue.length === 0 || !queue[0].resolved) {
if (errorOccurred) throw errorOccurred;
if (sourceDone && queue.length === 0) return;
if (combinedSignal.aborted) throw new AbortError(combinedSignal.reason);
await new Promise((r) => { resolveNext = r; });
}
const slot = queue.shift();
if (slot.error) throw slot.error;
yield slot.value;
schedulePump();
}
} finally {
if (cleanupSignal) cleanupSignal();
}
})(), { objectMode: true });
};
Readable.prototype.filter = function filter(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const concurrency = validateConcurrency(options);
const signal = options?.signal;
const hwm = options?.highWaterMark;
const outHWM = hwm != null ? hwm : concurrency - 1;
const src = this;
return readableFrom((async function* () {
throwIfAborted(signal);
const ac = new AbortController();
let cleanupSignal;
if (signal) {
const onAbort = () => { ac.abort(signal.reason); };
if (signal.aborted) { ac.abort(signal.reason); }
else {
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
}
const combinedSignal = ac.signal;
try {
const queue = [];
let sourceDone = false;
let inFlight = 0;
let errorOccurred = null;
let resolveNext = null;
let pumpScheduled = false;
const it = src[Symbol.asyncIterator]();
function onSlotReady() {
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
}
function schedulePump() {
if (!pumpScheduled) {
pumpScheduled = true;
Promise.resolve().then(() => {
pumpScheduled = false;
pump();
});
}
}
async function pump() {
while (!sourceDone && !ac.signal.aborted && !errorOccurred && inFlight < concurrency && queue.length < concurrency + outHWM) {
let step;
try {
step = await it.next();
} catch (e) {
errorOccurred = e;
onSlotReady();
return;
}
if (step.done) {
sourceDone = true;
onSlotReady();
return;
}
const slot = { resolved: false, include: false, chunk: step.value, error: null };
queue.push(slot);
inFlight++;
const chunk = step.value;
Promise.resolve().then(async () => {
try {
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
const result = await fn(chunk, { signal: combinedSignal });
slot.include = !!result;
} catch (e) {
slot.error = e;
if (!errorOccurred) errorOccurred = e;
} finally {
slot.resolved = true;
inFlight--;
onSlotReady();
schedulePump();
}
});
}
}
pump();
while (true) {
while (queue.length === 0 || !queue[0].resolved) {
if (errorOccurred) throw errorOccurred;
if (sourceDone && queue.length === 0) return;
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
await new Promise(r => { resolveNext = r; });
}
const slot = queue.shift();
if (slot.error) throw slot.error;
if (slot.include) yield slot.chunk;
schedulePump();
}
} finally {
if (cleanupSignal) cleanupSignal();
}
})(), { objectMode: true, highWaterMark: outHWM });
};
Readable.prototype.flatMap = function flatMap(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const concurrency = validateConcurrency(options);
const signal = options?.signal;
const hwm = options?.highWaterMark;
const outHWM = hwm != null ? hwm : concurrency - 1;
const src = this;
return readableFrom((async function* () {
throwIfAborted(signal);
const ac = new AbortController();
let cleanupSignal;
if (signal) {
const onAbort = () => { ac.abort(signal.reason); };
if (signal.aborted) { ac.abort(signal.reason); }
else {
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
}
const combinedSignal = ac.signal;
try {
const queue = [];
let sourceDone = false;
let inFlight = 0;
let errorOccurred = null;
let resolveNext = null;
let pumpScheduled = false;
const it = src[Symbol.asyncIterator]();
function onSlotReady() {
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
}
function schedulePump() {
if (!pumpScheduled) {
pumpScheduled = true;
Promise.resolve().then(() => {
pumpScheduled = false;
pump();
});
}
}
async function pump() {
while (!sourceDone && !ac.signal.aborted && !errorOccurred && inFlight < concurrency && queue.length < concurrency + outHWM) {
let step;
try {
step = await it.next();
} catch (e) {
errorOccurred = e;
onSlotReady();
return;
}
if (step.done) {
sourceDone = true;
onSlotReady();
return;
}
const slot = { resolved: false, value: undefined, error: null };
queue.push(slot);
inFlight++;
const chunk = step.value;
Promise.resolve().then(async () => {
try {
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
const result = await fn(chunk, { signal: combinedSignal });
slot.value = result;
} catch (e) {
slot.error = e;
if (!errorOccurred) errorOccurred = e;
} finally {
slot.resolved = true;
inFlight--;
onSlotReady();
schedulePump();
}
});
}
}
pump();
while (true) {
while (queue.length === 0 || !queue[0].resolved) {
if (errorOccurred) throw errorOccurred;
if (sourceDone && queue.length === 0) return;
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
await new Promise(r => { resolveNext = r; });
}
const slot = queue.shift();
if (slot.error) throw slot.error;
const out = slot.value;
if (out && typeof out[Symbol.asyncIterator] === 'function') {
yield* out;
} else if (out && typeof out[Symbol.iterator] === 'function') {
yield* out;
} else {
yield out;
}
schedulePump();
}
} finally {
if (cleanupSignal) cleanupSignal();
}
})(), { objectMode: true, highWaterMark: outHWM });
};
Readable.prototype.take = function take(n, options) {
n = +n;
if (n < 0 || n === -Infinity) {
throw new ERR_OUT_OF_RANGE('number', '>= 0', n);
}
if (Number.isNaN(n)) n = 0;
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const signal = options?.signal;
const src = this;
return readableFrom((async function* () {
throwIfAborted(signal);
let i = 0;
for await (const chunk of src) {
throwIfAborted(signal);
if (i++ >= n) break;
yield chunk;
}
})(), { objectMode: true });
};
Readable.prototype.drop = function drop(n, options) {
n = +n;
if (n < 0 || n === -Infinity) {
throw new ERR_OUT_OF_RANGE('number', '>= 0', n);
}
if (Number.isNaN(n)) n = 0;
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const signal = options?.signal;
const src = this;
return readableFrom((async function* () {
throwIfAborted(signal);
let i = 0;
for await (const chunk of src) {
throwIfAborted(signal);
if (i++ < n) continue;
yield chunk;
}
})(), { objectMode: true });
};
Readable.prototype.toArray = async function toArray(options) {
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const signal = options?.signal;
throwIfAborted(signal);
const out = [];
for await (const chunk of this) {
throwIfAborted(signal);
out.push(chunk);
}
return out;
};
Readable.prototype.forEach = async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const concurrency = validateConcurrency(options);
const signal = options?.signal;
const src = this;
const ac = new AbortController();
let cleanupSignal;
if (signal) {
const onAbort = () => { ac.abort(signal.reason); };
if (signal.aborted) { ac.abort(signal.reason); }
else {
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
}
const combinedSignal = ac.signal;
try {
throwIfAborted(signal);
const queue = [];
let sourceDone = false;
let inFlight = 0;
let errorOccurred = null;
let resolveNext = null;
let pumpScheduled = false;
const it = src[Symbol.asyncIterator]();
function onSlotReady() {
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
}
function schedulePump() {
if (!pumpScheduled) {
pumpScheduled = true;
Promise.resolve().then(() => {
pumpScheduled = false;
pump();
});
}
}
async function pump() {
while (!sourceDone && !ac.signal.aborted && !errorOccurred && inFlight < concurrency && queue.length < concurrency) {
let step;
try {
step = await it.next();
} catch (e) {
errorOccurred = e;
onSlotReady();
return;
}
if (step.done) {
sourceDone = true;
onSlotReady();
return;
}
const slot = { resolved: false, error: null };
queue.push(slot);
inFlight++;
const chunk = step.value;
Promise.resolve().then(async () => {
try {
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
await fn(chunk, { signal: combinedSignal });
} catch (e) {
slot.error = e;
if (!errorOccurred) errorOccurred = e;
} finally {
slot.resolved = true;
inFlight--;
onSlotReady();
schedulePump();
}
});
}
}
pump();
while (true) {
while (queue.length === 0 || !queue[0].resolved) {
if (errorOccurred) throw errorOccurred;
if (sourceDone && queue.length === 0) return;
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
await new Promise(r => { resolveNext = r; });
}
const slot = queue.shift();
if (slot.error) throw slot.error;
schedulePump();
}
} finally {
if (cleanupSignal) cleanupSignal();
}
};
Readable.prototype.reduce = async function reduce(fn, initialValue, options) {
if (typeof fn !== 'function') {
throw new TypeError('fn must be a function');
}
const hasInitial = arguments.length >= 2;
if (typeof arguments[arguments.length - 1] === 'object' && arguments[arguments.length - 1] !== null) {
options = arguments[arguments.length - 1];
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const signal = options?.signal;
if (signal?.aborted) {
this.destroy();
throw new AbortError(signal.reason);
}
let acc;
let started = hasInitial;
if (started) acc = initialValue;
let cleanupSignal;
if (signal) {
const onAbort = () => {
this.destroy();
};
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
try {
for await (const chunk of this) {
throwIfAborted(signal);
if (!started) {
acc = chunk;
started = true;
continue;
}
acc = await fn(acc, chunk, { signal });
}
throwIfAborted(signal);
} finally {
if (cleanupSignal) cleanupSignal();
}
if (!started) throw new TypeError('Reduce of empty stream with no initial value');
return acc;
};
Readable.prototype.some = async function some(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const concurrency = validateConcurrency(options);
const signal = options?.signal;
const src = this;
const ac = new AbortController();
let cleanupSignal;
if (signal) {
const onAbort = () => { ac.abort(signal.reason); };
if (signal.aborted) { ac.abort(signal.reason); }
else {
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
}
const combinedSignal = ac.signal;
try {
throwIfAborted(signal);
let found = false;
const queue = [];
let sourceDone = false;
let inFlight = 0;
let errorOccurred = null;
let resolveNext = null;
let pumpScheduled = false;
const it = src[Symbol.asyncIterator]();
function onSlotReady() {
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
}
function schedulePump() {
if (!pumpScheduled) {
pumpScheduled = true;
Promise.resolve().then(() => {
pumpScheduled = false;
pump();
});
}
}
async function pump() {
while (!sourceDone && !found && !ac.signal.aborted && !errorOccurred && inFlight < concurrency) {
let step;
try {
step = await it.next();
} catch (e) {
errorOccurred = e;
onSlotReady();
return;
}
if (step.done) {
sourceDone = true;
onSlotReady();
return;
}
const slot = { resolved: false, result: false, error: null };
queue.push(slot);
inFlight++;
const chunk = step.value;
Promise.resolve().then(async () => {
try {
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
slot.result = !!(await fn(chunk, { signal: combinedSignal }));
if (slot.result) found = true;
} catch (e) {
slot.error = e;
if (!errorOccurred) errorOccurred = e;
} finally {
slot.resolved = true;
inFlight--;
onSlotReady();
if (!found) schedulePump();
}
});
}
}
pump();
while (true) {
while (queue.length === 0 || !queue[0].resolved) {
if (errorOccurred) throw errorOccurred;
if (sourceDone && queue.length === 0) return false;
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
await new Promise(r => { resolveNext = r; });
}
const slot = queue.shift();
if (slot.error) throw slot.error;
if (slot.result) return true;
schedulePump();
}
} finally {
if (cleanupSignal) cleanupSignal();
}
};
Readable.prototype.every = async function every(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const concurrency = validateConcurrency(options);
const signal = options?.signal;
const src = this;
const ac = new AbortController();
let cleanupSignal;
if (signal) {
const onAbort = () => { ac.abort(signal.reason); };
if (signal.aborted) { ac.abort(signal.reason); }
else {
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
}
const combinedSignal = ac.signal;
try {
throwIfAborted(signal);
let failed = false;
const queue = [];
let sourceDone = false;
let inFlight = 0;
let errorOccurred = null;
let resolveNext = null;
let pumpScheduled = false;
const it = src[Symbol.asyncIterator]();
function onSlotReady() {
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
}
function schedulePump() {
if (!pumpScheduled) {
pumpScheduled = true;
Promise.resolve().then(() => {
pumpScheduled = false;
pump();
});
}
}
async function pump() {
while (!sourceDone && !failed && !ac.signal.aborted && !errorOccurred && inFlight < concurrency) {
let step;
try {
step = await it.next();
} catch (e) {
errorOccurred = e;
onSlotReady();
return;
}
if (step.done) {
sourceDone = true;
onSlotReady();
return;
}
const slot = { resolved: false, result: true, error: null };
queue.push(slot);
inFlight++;
const chunk = step.value;
Promise.resolve().then(async () => {
try {
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
slot.result = !!(await fn(chunk, { signal: combinedSignal }));
if (!slot.result) failed = true;
} catch (e) {
slot.error = e;
if (!errorOccurred) errorOccurred = e;
} finally {
slot.resolved = true;
inFlight--;
onSlotReady();
if (!failed) schedulePump();
}
});
}
}
pump();
while (true) {
while (queue.length === 0 || !queue[0].resolved) {
if (errorOccurred) throw errorOccurred;
if (sourceDone && queue.length === 0) return true;
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
await new Promise(r => { resolveNext = r; });
}
const slot = queue.shift();
if (slot.error) throw slot.error;
if (!slot.result) return false;
schedulePump();
}
} finally {
if (cleanupSignal) cleanupSignal();
}
};
Readable.prototype.find = async function find(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn);
}
if (options != null) validateObject(options, 'options');
if (options?.signal != null) validateAbortSignal(options.signal, 'options.signal');
const concurrency = validateConcurrency(options);
const signal = options?.signal;
const src = this;
const ac = new AbortController();
let cleanupSignal;
if (signal) {
const onAbort = () => { ac.abort(signal.reason); };
if (signal.aborted) { ac.abort(signal.reason); }
else {
signal.addEventListener('abort', onAbort, { once: true });
cleanupSignal = () => signal.removeEventListener('abort', onAbort);
}
}
const combinedSignal = ac.signal;
try {
throwIfAborted(signal);
let found = false;
let foundChunk;
const queue = [];
let sourceDone = false;
let inFlight = 0;
let errorOccurred = null;
let resolveNext = null;
let pumpScheduled = false;
const it = src[Symbol.asyncIterator]();
function onSlotReady() {
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
}
function schedulePump() {
if (!pumpScheduled) {
pumpScheduled = true;
Promise.resolve().then(() => {
pumpScheduled = false;
pump();
});
}
}
async function pump() {
while (!sourceDone && !found && !ac.signal.aborted && !errorOccurred && inFlight < concurrency) {
let step;
try {
step = await it.next();
} catch (e) {
errorOccurred = e;
onSlotReady();
return;
}
if (step.done) {
sourceDone = true;
onSlotReady();
return;
}
const slot = { resolved: false, matched: false, chunk: step.value, error: null };
queue.push(slot);
inFlight++;
const chunk = step.value;
Promise.resolve().then(async () => {
try {
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
slot.matched = !!(await fn(chunk, { signal: combinedSignal }));
if (slot.matched) found = true;
} catch (e) {
slot.error = e;
if (!errorOccurred) errorOccurred = e;
} finally {
slot.resolved = true;
inFlight--;
onSlotReady();
if (!found) schedulePump();
}
});
}
}
pump();
while (true) {
while (queue.length === 0 || !queue[0].resolved) {
if (errorOccurred) throw errorOccurred;
if (sourceDone && queue.length === 0) return undefined;
if (ac.signal.aborted) throw new AbortError(ac.signal.reason);
await new Promise(r => { resolveNext = r; });
}
const slot = queue.shift();
if (slot.error) throw slot.error;
if (slot.matched) return slot.chunk;
schedulePump();
}
} finally {
if (cleanupSignal) cleanupSignal();
}
};
if (typeof Symbol.asyncDispose !== 'undefined') {
Readable.prototype[Symbol.asyncDispose] = async function() {
let error;
if (!this.destroyed) {
error = this.readableEnded ? null : new AbortError();
this.destroy(error);
}
await new Promise((resolve, reject) =>
eos(this, (err) => (err && err.name !== 'AbortError' ? reject(err) : resolve(null))),
);
};
}
Readable._fromList = fromList;
Readable.ReadableState = ReadableState;
Readable.from = readableFrom;
Readable.wrap = wrap;
Readable.fromWeb = function(readableStream, options) {
return newStreamReadableFromReadableStream(readableStream, options);
};
Readable.toWeb = function(streamReadable, options) {
return newReadableStreamFromStreamReadable(streamReadable, options);
};
export default Readable;
export { fromList as _fromList, readableFrom as from, ReadableState, wrap, newStreamReadableFromReadableStream as fromWeb, newReadableStreamFromStreamReadable as toWeb };