const Pusher = require('pusher-js');
const DeltaAlgorithms = {
FOSSIL: 'fossil',
XDELTA3: 'xdelta3'
};
class DeltaCompressionManager {
constructor() {
this.channelCaches = new Map();
this.stats = {
fullMessages: 0,
deltaMessages: 0,
bytesReceived: 0,
bytesReconstructed: 0,
errors: 0
};
}
handleCacheSync(channel, data) {
console.log(`📦 Cache sync received for channel: ${channel}`);
const { conflation_key, max_messages_per_key, states } = data;
const cache = {
conflationKey: conflation_key,
maxMessagesPerKey: max_messages_per_key,
states: new Map()
};
for (const [key, messages] of Object.entries(states)) {
const messageQueue = messages.map(msg => ({
content: JSON.parse(msg.content),
sequence: msg.seq
}));
cache.states.set(key, messageQueue);
console.log(` - Loaded ${messageQueue.length} cached messages for key: ${key}`);
}
this.channelCaches.set(channel, cache);
console.log(`✅ Cache initialized with ${cache.states.size} conflation groups`);
}
processMessage(channel, event, data) {
if (data.__delta) {
return this.reconstructDelta(channel, data);
} else if (data.__delta_seq !== undefined) {
this.updateCache(channel, data);
return data;
} else {
return data;
}
}
reconstructDelta(channel, deltaData) {
this.stats.deltaMessages++;
const {
__delta: deltaBase64,
__delta_seq: sequence,
__delta_algorithm: algorithm,
__conflation_key: conflationKey,
__base_index: baseIndex,
...metadata
} = deltaData;
try {
const baseMessage = this.getBaseMessage(channel, conflationKey, baseIndex);
if (!baseMessage) {
console.error(`❌ Base message not found for channel ${channel}, key ${conflationKey}, index ${baseIndex}`);
this.stats.errors++;
return null;
}
const deltaBytes = Buffer.from(deltaBase64, 'base64');
const reconstructed = this.applyDelta(
JSON.stringify(baseMessage.content),
deltaBytes,
algorithm
);
const fullMessage = JSON.parse(reconstructed);
this.updateCache(channel, { ...fullMessage, __delta_seq: sequence, __conflation_key: conflationKey });
this.stats.bytesReceived += deltaBase64.length;
this.stats.bytesReconstructed += reconstructed.length;
return fullMessage;
} catch (error) {
console.error(`❌ Failed to reconstruct delta:`, error);
this.stats.errors++;
return null;
}
}
getBaseMessage(channel, conflationKey, baseIndex) {
const cache = this.channelCaches.get(channel);
if (!cache) return null;
const stateKey = conflationKey || '';
const messages = cache.states.get(stateKey);
if (!messages || baseIndex >= messages.length) return null;
return messages[baseIndex];
}
updateCache(channel, data) {
const cache = this.channelCaches.get(channel);
if (!cache) return;
const conflationKey = data.__conflation_key || '';
const sequence = data.__delta_seq;
const { __delta_seq, __delta_full, __conflation_key, ...content } = data;
let messages = cache.states.get(conflationKey);
if (!messages) {
messages = [];
cache.states.set(conflationKey, messages);
}
messages.push({ content, sequence });
while (messages.length > cache.maxMessagesPerKey) {
messages.shift();
}
this.stats.fullMessages++;
}
applyDelta(baseString, deltaBytes, algorithm) {
console.warn(`⚠️ Delta reconstruction not fully implemented - would use ${algorithm} algorithm`);
console.warn(` Base size: ${baseString.length} bytes, Delta size: ${deltaBytes.length} bytes`);
throw new Error('Delta reconstruction requires native modules (fossil-delta or xdelta3)');
}
getStats() {
const compressionRatio = this.stats.bytesReconstructed > 0
? ((1 - this.stats.bytesReceived / this.stats.bytesReconstructed) * 100).toFixed(1)
: 0;
return {
...this.stats,
compressionRatio: `${compressionRatio}%`
};
}
clearChannelCache(channel) {
this.channelCaches.delete(channel);
console.log(`🗑️ Cleared cache for channel: ${channel}`);
}
}
async function main() {
console.log('🚀 Sockudo Delta Compression Client Example\n');
const pusher = new Pusher('app-key', {
wsHost: 'localhost',
wsPort: 6001,
forceTLS: false,
disableStats: true,
enabledTransports: ['ws']
});
const deltaManager = new DeltaCompressionManager();
pusher.connection.bind('connected', () => {
console.log('✅ Connected to Sockudo\n');
console.log('📤 Enabling delta compression...');
pusher.send_event('pusher:enable_delta_compression', {});
console.log('✅ Delta compression enabled\n');
});
console.log('📡 Subscribing to channel: market-data\n');
const channel = pusher.subscribe('market-data');
channel.bind('pusher:delta_cache_sync', (data) => {
deltaManager.handleCacheSync('market-data', JSON.parse(data));
});
channel.bind('price-update', (data) => {
const fullMessage = deltaManager.processMessage('market-data', 'price-update', data);
if (fullMessage) {
console.log(`💰 Price Update:`, {
asset: fullMessage.asset,
price: fullMessage.price,
volume: fullMessage.volume
});
}
});
channel.bind('pusher:subscription_succeeded', () => {
console.log('✅ Subscribed to market-data channel\n');
console.log('Waiting for messages...\n');
});
channel.bind('pusher:error', (error) => {
console.error('❌ Error:', error);
});
setInterval(() => {
const stats = deltaManager.getStats();
if (stats.fullMessages > 0 || stats.deltaMessages > 0) {
console.log('\n📊 Statistics:', stats);
}
}, 5000);
process.on('SIGINT', () => {
console.log('\n\n👋 Shutting down...');
console.log('\n📊 Final Statistics:', deltaManager.getStats());
pusher.disconnect();
process.exit(0);
});
}
main().catch(console.error);