export class WorkerPool {
constructor(workerUrl, wasmUrl, options = {}) {
this.workerUrl = workerUrl;
this.wasmUrl = wasmUrl;
this.poolSize = options.poolSize || navigator.hardwareConcurrency || 4;
this.workers = [];
this.nextWorker = 0;
this.pendingRequests = new Map();
this.requestId = 0;
this.initialized = false;
this.options = options;
}
async init() {
if (this.initialized) return;
console.log(`Initializing worker pool with ${this.poolSize} workers...`);
const initPromises = [];
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerUrl, { type: 'module' });
worker.onmessage = (e) => this.handleMessage(i, e);
worker.onerror = (error) => this.handleError(i, error);
this.workers.push({
worker,
busy: false,
id: i
});
const initPromise = this.sendToWorker(i, 'init', {
wasmUrl: this.wasmUrl,
dimensions: this.options.dimensions,
metric: this.options.metric,
useHnsw: this.options.useHnsw
});
initPromises.push(initPromise);
}
await Promise.all(initPromises);
this.initialized = true;
console.log(`Worker pool initialized successfully`);
}
handleMessage(workerId, event) {
const { type, requestId, data, error } = event.data;
if (type === 'error') {
const request = this.pendingRequests.get(requestId);
if (request) {
request.reject(new Error(error.message));
this.pendingRequests.delete(requestId);
}
return;
}
const request = this.pendingRequests.get(requestId);
if (request) {
this.workers[workerId].busy = false;
request.resolve(data);
this.pendingRequests.delete(requestId);
}
}
handleError(workerId, error) {
console.error(`Worker ${workerId} error:`, error);
for (const [requestId, request] of this.pendingRequests) {
if (request.workerId === workerId) {
request.reject(error);
this.pendingRequests.delete(requestId);
}
}
}
getNextWorker() {
for (let i = 0; i < this.workers.length; i++) {
const idx = (this.nextWorker + i) % this.workers.length;
if (!this.workers[idx].busy) {
this.nextWorker = (idx + 1) % this.workers.length;
return idx;
}
}
const idx = this.nextWorker;
this.nextWorker = (this.nextWorker + 1) % this.workers.length;
return idx;
}
sendToWorker(workerId, type, data) {
return new Promise((resolve, reject) => {
const requestId = this.requestId++;
this.pendingRequests.set(requestId, {
resolve,
reject,
workerId,
timestamp: Date.now()
});
this.workers[workerId].busy = true;
this.workers[workerId].worker.postMessage({
type,
data: { ...data, requestId }
});
setTimeout(() => {
if (this.pendingRequests.has(requestId)) {
this.pendingRequests.delete(requestId);
reject(new Error('Request timeout'));
}
}, 30000);
});
}
async execute(type, data) {
if (!this.initialized) {
await this.init();
}
const workerId = this.getNextWorker();
return this.sendToWorker(workerId, type, data);
}
async insert(vector, id = null, metadata = null) {
return this.execute('insert', { vector, id, metadata });
}
async insertBatch(entries) {
const chunkSize = Math.ceil(entries.length / this.poolSize);
const chunks = [];
for (let i = 0; i < entries.length; i += chunkSize) {
chunks.push(entries.slice(i, i + chunkSize));
}
const promises = chunks.map((chunk, i) =>
this.sendToWorker(i % this.poolSize, 'insertBatch', { entries: chunk })
);
const results = await Promise.all(promises);
return results.flat();
}
async search(query, k = 10, filter = null) {
return this.execute('search', { query, k, filter });
}
async searchBatch(queries, k = 10, filter = null) {
const promises = queries.map((query, i) =>
this.sendToWorker(i % this.poolSize, 'search', { query, k, filter })
);
return Promise.all(promises);
}
async delete(id) {
return this.execute('delete', { id });
}
async get(id) {
return this.execute('get', { id });
}
async len() {
return this.sendToWorker(0, 'len', {});
}
terminate() {
for (const { worker } of this.workers) {
worker.terminate();
}
this.workers = [];
this.initialized = false;
console.log('Worker pool terminated');
}
getStats() {
return {
poolSize: this.poolSize,
busyWorkers: this.workers.filter(w => w.busy).length,
idleWorkers: this.workers.filter(w => !w.busy).length,
pendingRequests: this.pendingRequests.size
};
}
}
export default WorkerPool;