"use strict";
module.exports = Service;
var util = require("../util/minimal");
(Service.prototype = Object.create(util.EventEmitter.prototype)).constructor = Service;
function Service(rpcImpl, requestDelimited, responseDelimited) {
if (typeof rpcImpl !== "function")
throw TypeError("rpcImpl must be a function");
util.EventEmitter.call(this);
this.rpcImpl = rpcImpl;
this.requestDelimited = Boolean(requestDelimited);
this.responseDelimited = Boolean(responseDelimited);
}
Service.prototype.rpcCall = function rpcCall(method, requestCtor, responseCtor, request, callback) {
if (!request)
throw TypeError("request must be specified");
var self = this;
if (!callback)
return util.asPromise(rpcCall, self, method, requestCtor, responseCtor, request);
if (!self.rpcImpl) {
setTimeout(function() { callback(Error("already ended")); }, 0);
return undefined;
}
try {
return self.rpcImpl(
method,
requestCtor[self.requestDelimited ? "encodeDelimited" : "encode"](request).finish(),
function rpcCallback(err, response) {
if (err) {
self.emit("error", err, method);
return callback(err);
}
if (response === null) {
self.end( true);
return undefined;
}
if (!(response instanceof responseCtor)) {
try {
response = responseCtor[self.responseDelimited ? "decodeDelimited" : "decode"](response);
} catch (err) {
self.emit("error", err, method);
return callback(err);
}
}
self.emit("data", response, method);
return callback(null, response);
}
);
} catch (err) {
self.emit("error", err, method);
setTimeout(function() { callback(err); }, 0);
return undefined;
}
};
Service.prototype.end = function end(endedByRPC) {
if (this.rpcImpl) {
if (!endedByRPC) this.rpcImpl(null, null, null);
this.rpcImpl = null;
this.emit("end").off();
}
return this;
};