var sys = require('util');
var stomp = require('stomp');
var stomp_args = {
port: 61613,
host: process.argv[2] ? process.argv[2] : 'localhost',
debug: false,
login: "2sub",
passcode: "passcode",
};
var destination = "memtop-a";
var testMessage = "Testing01\n";
var errorHandler = function(error_frame) {
console.log("error");
console.log("headers: " + sys.inspect(error_frame.headers));
console.log("body: " + error_frame.body);
setTimeout(function() {
publisher.disconnect();
subscriber1.disconnect();
subscriber2.disconnect();
process.exit(1);
}, 2000);
};
stomp_args.login = "sub1";
var subscriber1 = new stomp.Stomp(stomp_args);
stomp_args.login = "sub2";
var subscriber2 = new stomp.Stomp(stomp_args);
stomp_args.login = "pub2";
var publisher = new stomp.Stomp(stomp_args);
subscriber1.on("connected", function() {
subscriber1.subscribe({
destination: destination,
id : "1",
ack: "client",
receipt: "sub1"
});
});
subscriber1.on("receipt", function(id) {
if (id === "sub1") {
subscriber1.subscribed = 1;
}
else if (id === "unsub1") {
subscriber1.disconnect();
}
else console.error("sub1 unexpected receipt " + id);
});
subscriber1.on("message", function(message) {
if (testMessage !== "" + message.body) {
console.error("wrong msg");
console.error("want:" + testMessage);
console.error("got:" + message.body);
errorHandler({});
}
else {
subscriber1.ack(message.headers["message-id"]);
subscriber1.unsubscribe({
destination: destination,
id : "1",
receipt: "unsub1"
});
}
});
subscriber1.on("error", errorHandler);
subscriber2.on("connected", function() {
subscriber2.subscribe({
destination: destination,
id : "2",
ack: "client",
receipt: "sub2"
});
});
subscriber2.on("receipt", function(id) {
if (id === "sub2") {
subscriber2.subscribed = 2;
}
else if (id === "unsub2") {
subscriber2.disconnect();
}
else console.error("sub2 unexpected receipt");
});
subscriber2.on("message", function(message) {
if (testMessage !== "" + message.body) {
console.error("wrong msg");
console.error("want:" + testMessage);
console.error("got:" + message.body);
errorHandler({});
}
else {
subscriber2.ack(message.headers["message-id"]);
subscriber2.unsubscribe({
destination: destination,
id : "2",
receipt: "unsub2"
});
}
});
subscriber2.on("error", errorHandler);
var publisher = new stomp.Stomp(stomp_args);
publisher.on("connected", function() {
if ( subscriber1.subscribed === 1 && subscriber2.subscribed === 2 ) {
publisher.send({
"destination" : destination,
"foo" : "1",
"body" : testMessage,
"receipt" : "send"
});
}
else {
setTimeout(function() {
publisher.send({
"destination" : destination,
"foo" : "1",
"body" : testMessage,
"receipt" : "send"
});
}, 100);
}
});
publisher.on("receipt", function(id) {
if (id === "send") {
publisher.disconnect();
}
});
publisher.on("error", errorHandler);
process.on("SIGINT", function() {
publisher.disconnect();
subscriber1.disconnect();
subscriber2.disconnect();
});
subscriber1.connect();
subscriber2.connect();
publisher.connect();