wasm-rquickjs 0.3.4

Tool for wrapping JavaScript modules as WebAssembly components using the QuickJS engine
Documentation
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// deno-lint-ignore-file

import { destroyer } from "__wasm_rquickjs_builtin/internal/streams/destroy";
import eos from "__wasm_rquickjs_builtin/internal/streams/end-of-stream";
import {
    isNodeStream,
    isReadable,
    isReadableStream,
    isTransformStream,
    isWebStream,
    isWritable,
    isWritableStream,
} from "__wasm_rquickjs_builtin/internal/streams/utils";
import { pipeline } from "__wasm_rquickjs_builtin/internal/streams/pipeline";
import {
    AbortError,
    ERR_INVALID_ARG_VALUE,
    ERR_MISSING_ARGS,
} from "__wasm_rquickjs_builtin/internal/errors";
import Duplex from "__wasm_rquickjs_builtin/internal/streams/duplex";

// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
    constructor(options) {
        super(options);

        // https://github.com/nodejs/node/pull/34385

        if (options?.readable === false) {
            this._readableState.readable = false;
            this._readableState.ended = true;
            this._readableState.endEmitted = true;
        }

        if (options?.writable === false) {
            this._writableState.writable = false;
            this._writableState.ending = true;
            this._writableState.ended = true;
            this._writableState.finished = true;
        }
    }
}

function compose(...streams) {
    if (streams.length === 0) {
        throw new ERR_MISSING_ARGS("streams");
    }

    if (streams.length === 1) {
        return Duplex.from(streams[0]);
    }

    const orgStreams = [...streams];

    if (typeof streams[0] === "function") {
        streams[0] = Duplex.from(streams[0]);
    }

    if (typeof streams[streams.length - 1] === "function") {
        const idx = streams.length - 1;
        streams[idx] = Duplex.from(streams[idx]);
    }

    for (let n = 0; n < streams.length; ++n) {
        if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
            // TODO(ronag): Add checks for non streams.
            continue;
        }
        if (
            n < streams.length - 1 &&
            !(
                isReadable(streams[n]) ||
                isReadableStream(streams[n]) ||
                isTransformStream(streams[n])
            )
        ) {
            throw new ERR_INVALID_ARG_VALUE(
                `streams[${n}]`,
                orgStreams[n],
                "must be readable",
            );
        }
        if (
            n > 0 &&
            !(
                isWritable(streams[n]) ||
                isWritableStream(streams[n]) ||
                isTransformStream(streams[n])
            )
        ) {
            throw new ERR_INVALID_ARG_VALUE(
                `streams[${n}]`,
                orgStreams[n],
                "must be writable",
            );
        }
    }

    let ondrain;
    let onfinish;
    let onreadable;
    let onclose;
    let d;

    function onfinished(err) {
        const cb = onclose;
        onclose = null;

        if (cb) {
            cb(err);
        } else if (err) {
            d.destroy(err);
        } else if (!readable && !writable) {
            d.destroy();
        }
    }

    const head = streams[0];
    const tail = pipeline(streams, onfinished);

    const writable = !!(
        isWritable(head) ||
        isWritableStream(head) ||
        isTransformStream(head)
    );
    const readable = !!(
        isReadable(tail) ||
        isReadableStream(tail) ||
        isTransformStream(tail)
    );

    // TODO(ronag): Avoid double buffering.
    // Implement Writable/Readable/Duplex traits.
    // See, https://github.com/nodejs/node/pull/33515.
    d = new ComposeDuplex({
        // TODO (ronag): highWaterMark?
        writableObjectMode: !!head?.writableObjectMode,
        readableObjectMode: !!tail?.readableObjectMode,
        writable,
        readable,
    });

    if (writable) {
        if (isNodeStream(head)) {
            d._write = function (chunk, encoding, callback) {
                if (head.write(chunk, encoding)) {
                    callback();
                } else {
                    ondrain = callback;
                }
            };

            d._final = function (callback) {
                head.end();
                onfinish = callback;
            };

            head.on("drain", function () {
                if (ondrain) {
                    const cb = ondrain;
                    ondrain = null;
                    cb();
                }
            });
        } else if (isWebStream(head)) {
            const wsWritable = isTransformStream(head) ? head.writable : head;
            const writer = wsWritable.getWriter();

            d._write = async function (chunk, encoding, callback) {
                try {
                    await writer.ready;
                    writer.write(chunk).catch(() => {});
                    callback();
                } catch (err) {
                    callback(err);
                }
            };

            d._final = async function (callback) {
                try {
                    await writer.ready;
                    writer.close().catch(() => {});
                    onfinish = callback;
                } catch (err) {
                    callback(err);
                }
            };
        }

        const toRead = isTransformStream(tail) ? tail.readable : tail;

        eos(toRead, function () {
            if (onfinish) {
                const cb = onfinish;
                onfinish = null;
                cb();
            }
        });
    }

    if (readable) {
        if (isNodeStream(tail)) {
            tail.on("readable", function () {
                if (onreadable) {
                    const cb = onreadable;
                    onreadable = null;
                    cb();
                }
            });

            tail.on("end", function () {
                d.push(null);
            });

            d._read = function () {
                while (true) {
                    const buf = tail.read();

                    if (buf === null) {
                        onreadable = d._read;
                        return;
                    }

                    if (!d.push(buf)) {
                        return;
                    }
                }
            };
        } else if (isWebStream(tail)) {
            const wsReadable = isTransformStream(tail) ? tail.readable : tail;
            const reader = wsReadable.getReader();

            d._read = async function () {
                while (true) {
                    try {
                        const { value, done } = await reader.read();

                        if (!d.push(value)) {
                            return;
                        }

                        if (done) {
                            d.push(null);
                            return;
                        }
                    } catch {
                        return;
                    }
                }
            };
        }
    }

    d._destroy = function (err, callback) {
        if (!err && onclose !== null) {
            err = new AbortError();
        }

        onreadable = null;
        ondrain = null;
        onfinish = null;

        if (isNodeStream(tail)) {
            destroyer(tail, err);
        }

        if (onclose === null) {
            callback(err);
        } else {
            onclose = callback;
        }
    };

    return d;
}

export default compose;