jsmpi 0.1.0

A browser-oriented MPI compatibility layer for Rust/WASM using Web Workers
Documentation
const inbox = [];
const RECV_INDEX = 0;
const BARRIER_INDEX = 1;
const PROTOCOL_VERSION = 1;
let controlSignals = null;

function parseErrorCode(text) {
  const prefix = String(text || "").split(":", 1)[0]?.trim() || "";
  if (/^[A-Z_]+$/.test(prefix) && prefix.includes("_")) {
    return prefix;
  }
  return null;
}

function postStructured(type, payload = {}) {
  globalThis.postMessage({
    type,
    jobId: globalThis.__jsmpi_job_id,
    rank: globalThis.__jsmpi_rank,
    ...payload,
  });
}

function isInteger(value) {
  return Number.isInteger(value);
}

function isValidEnvelope(envelope) {
  if (!envelope || typeof envelope !== "object") {
    return false;
  }

  if (envelope.protocol_version !== PROTOCOL_VERSION) {
    return false;
  }

  if (!isInteger(envelope.src) || !isInteger(envelope.dst) || !isInteger(envelope.tag)) {
    return false;
  }

  if (envelope.src < 0 || envelope.dst < 0) {
    return false;
  }

  if (!Array.isArray(envelope.payload)) {
    return false;
  }

  return true;
}

function matches(envelope, source, tag) {
  const sourceOk = source == null || envelope.src === source;
  const tagOk = tag == null || envelope.tag === tag;
  return sourceOk && tagOk;
}

function takeMatchingEnvelope(source, tag) {
  const index = inbox.findIndex((envelope) => matches(envelope, source, tag));
  if (index === -1) {
    return null;
  }

  return inbox.splice(index, 1)[0] ?? null;
}

function wake(index) {
  if (!controlSignals) {
    return;
  }

  Atomics.add(controlSignals, index, 1);
  Atomics.notify(controlSignals, index, 1);
}

function formatArgs(args) {
  return args
    .map((value) => {
      if (typeof value === "string") {
        return value;
      }
      try {
        return JSON.stringify(value);
      } catch {
        return String(value);
      }
    })
    .join(" ");
}
const originalConsole = {
  debug: console.debug.bind(console),
  info: console.info.bind(console),
  log: console.log.bind(console),
  trace: console.trace.bind(console),
  warn: console.warn.bind(console),
  error: console.error.bind(console),
};

for (const level of ["debug", "info", "log", "trace", "warn", "error"]) {
  console[level] = (...args) => {
    postStructured("jsmpi:log", {
      level,
      text: formatArgs(args),
      phase: "runtime",
      event: "console",
    });
    originalConsole[level](...args);
  };
}

globalThis.addEventListener("error", (event) => {
  postStructured("jsmpi:error", {
    text: event.message,
    errorCode: parseErrorCode(event.message) || "WORKER_RUNTIME_ERROR",
    phase: "runtime",
    event: "error",
  });
});

globalThis.__jsmpi_send = function __jsmpi_send(envelope) {
  if (!isValidEnvelope(envelope)) {
    postStructured("jsmpi:error", {
      text: "PROTO_INVALID_ENVELOPE: outbound envelope validation failed",
      errorCode: "PROTO_INVALID_ENVELOPE",
      phase: "protocol",
      event: "send_validate",
    });
    return;
  }

  postStructured("jsmpi:send", { envelope, phase: "transport", event: "send" });
};

globalThis.__jsmpi_recv = function __jsmpi_recv(source, tag) {
  return takeMatchingEnvelope(source, tag);
};

globalThis.__jsmpi_recv_blocking = function __jsmpi_recv_blocking(source, tag) {
  while (true) {
    const envelope = takeMatchingEnvelope(source, tag);
    if (envelope) {
      return envelope;
    }

    if (!controlSignals) {
      return null;
    }

    const version = Atomics.load(controlSignals, RECV_INDEX);
    const afterCheck = takeMatchingEnvelope(source, tag);
    if (afterCheck) {
      return afterCheck;
    }

    Atomics.wait(controlSignals, RECV_INDEX, version);
  }
};

globalThis.__jsmpi_barrier_timeout = function __jsmpi_barrier_timeout(rank, size, timeoutMs) {
  const version = controlSignals ? Atomics.load(controlSignals, BARRIER_INDEX) : 0;
  postStructured("jsmpi:barrier", { rank, size, phase: "collective", event: "barrier_wait" });

  if (!controlSignals) {
    return true;
  }

  const timeout = typeof timeoutMs === "number" ? timeoutMs : undefined;
  const result = Atomics.wait(controlSignals, BARRIER_INDEX, version, timeout);
  return result !== "timed-out";
};

globalThis.__jsmpi_barrier_begin = function __jsmpi_barrier_begin(rank, size) {
  const version = controlSignals ? Atomics.load(controlSignals, BARRIER_INDEX) : 0;
  postStructured("jsmpi:barrier", { rank, size, phase: "collective", event: "barrier_begin" });
  return version;
};

globalThis.__jsmpi_barrier_ready = function __jsmpi_barrier_ready(version) {
  if (!controlSignals) {
    return true;
  }
  return Atomics.load(controlSignals, BARRIER_INDEX) !== version;
};

globalThis.__jsmpi_mark_finished = function __jsmpi_mark_finished() {
  postStructured("jsmpi:finished", { rank: globalThis.__jsmpi_rank, phase: "lifecycle", event: "finished" });
};

globalThis.addEventListener("message", async (event) => {
  const message = event.data ?? {};

  if (message.type === "jsmpi:init") {
    globalThis.__jsmpi_job_id = message.jobId || "job-unknown";
    globalThis.__jsmpi_rank = message.rank;
    globalThis.__jsmpi_size = message.size;
    globalThis.__jsmpi_collective_timeout_ms = Number.isFinite(message.collectiveTimeoutMs)
      ? message.collectiveTimeoutMs
      : undefined;
    globalThis.__jsmpi_collective_max_retries = Number.isFinite(message.collectiveMaxRetries)
      ? message.collectiveMaxRetries
      : undefined;
    controlSignals = message.controlBuffer ? new Int32Array(message.controlBuffer) : null;
    postStructured("jsmpi:ready", { rank: message.rank, phase: "startup", event: "ready" });

    try {
      if (message.moduleUrl) {
        const mod = await import(message.moduleUrl);
        if (typeof mod.default === "function") {
          await mod.default(message.wasmUrl);
        }
        if (typeof mod.jsmpi_main === "function") {
          const maybeResult = mod.jsmpi_main();
          if (maybeResult && typeof maybeResult.then === "function") {
            await maybeResult;
          }
          return;
        }
      }
      postStructured("jsmpi:finished", { rank: message.rank, phase: "lifecycle", event: "finished" });
    } catch (error) {
      const text = error instanceof Error ? error.stack ?? error.message : String(error);
      postStructured("jsmpi:error", {
        text: error instanceof Error ? error.stack ?? error.message : String(error),
        errorCode: parseErrorCode(text) || "WORKER_RUNTIME_ERROR",
        phase: "runtime",
        event: "module_execute",
      });
    }
    return;
  }

  if (message.type === "jsmpi:deliver" && message.envelope) {
    if (!isValidEnvelope(message.envelope)) {
      postStructured("jsmpi:error", {
        text: "PROTO_INVALID_ENVELOPE: inbound envelope validation failed",
        errorCode: "PROTO_INVALID_ENVELOPE",
        phase: "protocol",
        event: "deliver_validate",
      });
      return;
    }
    inbox.push(message.envelope);
    wake(RECV_INDEX);
    return;
  }

  if (message.type === "jsmpi:barrier-release") {
    wake(BARRIER_INDEX);
  }
});