'use strict';
const fs = require('fs');
const http = require('http');
const https = require('https');
const zlib = require('zlib');
const wasm = require('./nodejs/bgpkit_parser.js');
function parseOpenBmpMessage(data) {
const json = wasm.parseOpenBmpMessage(data);
return json ? JSON.parse(json) : null;
}
function parseBmpMessage(data, timestamp) {
return JSON.parse(wasm.parseBmpMessage(data, timestamp));
}
function parseBgpUpdate(data) {
return JSON.parse(wasm.parseBgpUpdate(data));
}
const MRT_HEADER_LEN = 12;
function mrtRecordSize(data, offset) {
if (offset + MRT_HEADER_LEN > data.length) return -1;
const bodyLen =
(data[offset + 8] << 24) |
(data[offset + 9] << 16) |
(data[offset + 10] << 8) |
data[offset + 11];
return MRT_HEADER_LEN + (bodyLen >>> 0);
}
function parseMrtRecord(data) {
const size = mrtRecordSize(data, 0);
if (size < 0 || size > data.length) return null;
const recordBytes = data.subarray(0, size);
const json = wasm.parseMrtRecord(recordBytes);
if (!json) return null;
const result = JSON.parse(json);
result.bytesRead = size;
return result;
}
function resetMrtParser() {
wasm.resetMrtParser();
}
function* parseMrtRecords(data) {
wasm.resetMrtParser();
let offset = 0;
while (offset < data.length) {
const size = mrtRecordSize(data, offset);
if (size < 0 || offset + size > data.length) break;
const recordBytes = data.subarray(offset, offset + size);
const json = wasm.parseMrtRecord(recordBytes);
if (!json) break;
const result = JSON.parse(json);
result.bytesRead = size;
yield result;
offset += size;
}
}
let _bz2Module = undefined; function getBz2() {
if (_bz2Module !== undefined) return _bz2Module;
for (const name of ['unbzip2-stream', 'seek-bzip', 'bz2']) {
try {
_bz2Module = { name, mod: require(name) };
return _bz2Module;
} catch {}
}
_bz2Module = null;
return null;
}
function detectCompression(pathOrUrl) {
const name = pathOrUrl.split('?')[0].split('#')[0]; if (name.endsWith('.gz')) return 'gz';
if (name.endsWith('.bz2')) return 'bz2';
if (name.endsWith('.xz')) return 'xz';
return 'none';
}
function httpGet(url) {
return new Promise((resolve, reject) => {
const lib = url.startsWith('https') ? https : http;
lib
.get(url, (res) => {
if (res.statusCode === 301 || res.statusCode === 302) {
httpGet(res.headers.location).then(resolve, reject);
return;
}
if (res.statusCode !== 200) {
reject(new Error(`HTTP ${res.statusCode} for ${url}`));
return;
}
resolve(res);
})
.on('error', reject);
});
}
function collectStream(stream) {
return new Promise((resolve, reject) => {
const chunks = [];
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('end', () => resolve(Buffer.concat(chunks)));
stream.on('error', reject);
});
}
function decompressSync(buf, compression) {
if (compression === 'gz') {
return zlib.gunzipSync(buf);
}
if (compression === 'bz2') {
const bz2 = getBz2();
if (!bz2) {
throw new Error(
'bzip2 decompression requires an optional dependency. ' +
'Install one of: npm install unbzip2-stream, npm install seek-bzip, npm install bz2'
);
}
if (bz2.name === 'seek-bzip') {
return Buffer.from(bz2.mod.decode(buf));
}
if (bz2.name === 'bz2') {
const decompress = bz2.mod.decompress || bz2.mod;
return Buffer.from(decompress(buf));
}
throw new Error(
'unbzip2-stream does not support sync decompression. ' +
'Use streamMrtFrom() instead, or install seek-bzip or bz2.'
);
}
if (compression === 'xz') {
throw new Error('xz decompression is not yet supported in the WASM package');
}
return buf;
}
function decompressStream(compression) {
if (compression === 'gz') {
return zlib.createGunzip();
}
if (compression === 'bz2') {
const bz2 = getBz2();
if (!bz2) {
throw new Error(
'bzip2 decompression requires an optional dependency. ' +
'Install one of: npm install unbzip2-stream, npm install seek-bzip, npm install bz2'
);
}
if (bz2.name === 'unbzip2-stream') {
return bz2.mod();
}
return null;
}
return null;
}
async function openMrt(pathOrUrl) {
const compression = detectCompression(pathOrUrl);
const isUrl =
pathOrUrl.startsWith('http://') || pathOrUrl.startsWith('https://');
if (isUrl) {
const rawStream = await httpGet(pathOrUrl);
const decomp = decompressStream(compression);
if (decomp) {
return collectStream(rawStream.pipe(decomp));
}
const raw = await collectStream(rawStream);
return decompressSync(raw, compression);
}
const raw = fs.readFileSync(pathOrUrl);
return decompressSync(raw, compression);
}
async function* streamMrtFrom(pathOrUrl) {
const raw = await openMrt(pathOrUrl);
yield* parseMrtRecords(raw);
}
module.exports = {
parseOpenBmpMessage,
parseBmpMessage,
parseBgpUpdate,
parseMrtRecords,
parseMrtRecord,
resetMrtParser,
openMrt,
streamMrtFrom,
};