from threading import Thread, Lock
import sys
import random
import pynorm
import signal
import time
from collections import deque
MSG_HDR_SIZE = 2
class InputThread(Thread):
def __init__(self, parent, *args, **kwargs):
super(InputThread, self).__init__(*args, **kwargs)
self.daemon = True
self.msgr = parent
def run(self):
while True:
try:
msgHdr = sys.stdin.buffer.read(MSG_HDR_SIZE)
except:
sys.stderr.write("normMsgr: input thread end-of-file 1 ...\n")
return
try:
msgSize = 256*int(msgHdr[0]) + int(msgHdr[1])
msgBuffer = sys.stdin.buffer.read(msgSize - 2)
except:
sys.stderr.write("normMsgr: input thread end-of-file 2 ...\n")
return
msgr.sendMessage(msgBuffer) ;
class OutputThread(Thread):
def __init__(self, parent, *args, **kwargs):
super(OutputThread, self).__init__(*args, **kwargs)
self.daemon = True
self.msgr = parent
def run(self):
while True:
msg = msgr.getRxMsg() ; msgLen = len(msg) + MSG_HDR_SIZE
msgHeader = bytearray(MSG_HDR_SIZE)
msgHeader[0] = (msgLen >> 8) & 0x00ff
msgHeader[1] = msgLen & 0x00ff
try:
sys.stdout.buffer.write(msgHeader)
sys.stdout.buffer.write(msg)
sys.stdout.flush()
except:
sys.stderr.write("normMsgr: output thread exiting ...\n")
return
del msg
class NormMsgr:
def __init__(self):
self.normInstance = pynorm.Instance()
self.normSession = None
self.normTxLock = Lock() ; self.normTxReady = Lock()
self.norm_tx_vacancy = True
self.norm_tx_queue_count = 0
self.norm_tx_queue_max = 2048
self.norm_tx_watermark_pending = False
self.norm_acking = False
self.tx_msg_cache = {}
self.normRxLock = Lock()
self.normRxReady = Lock()
self.normRxReady.acquire() ; self.output_msg_queue = deque()
random.seed(None) ;
def openNormSession(self, addr, port, nodeId):
self.normSession = self.normInstance.createSession(addr, port, nodeId)
self.normSession.setRxCacheLimit(2*self.norm_tx_queue_max) ; self.normSession.setDefaultSyncPolicy(pynorm.SyncPolicy.ALL);
self.normSession.setDefaultUnicastNack(True);
self.normSession.setTxCacheBounds(10*1024*1024, self.norm_tx_queue_max, self.norm_tx_queue_max);
self.normSession.setCongestionControl(True, True);
self.normSession.setRxPortReuse(True)
self.normSession.setMulticastLoopback(True)
return self.normSession
def addAckingNode(self, nodeId):
self.normSession.addAckingNode(nodeId);
self.norm_acking = True
def setNormMulticastInterface(self, ifaceName):
self.normSession.setMulticastInterface(ifaceName)
def setNormCCMode(self, ccMode):
if ccMode == "cc":
self.normSession.setEcnSupport(False, False, False)
elif ccMode == "cce":
self.normSession.setEcnSupport(True, True)
elif ccMode == "ccl":
self.normSession.setEcnSupport(False, False, True)
elif ccMode == "fixed":
self.normSession.setEcnSupport(False, False, False)
else:
raise Exception("normMsgr: invalid ccMode \"%s\"" % ccMode)
if ccMode != "fixed":
self.normSession.setCongestionControl(True)
else:
self.normSession.setCongetstionControl(False)
def setNormTxRate(self, bitsPerSecond):
self.normSession.setTxRate(bitsPerSecond)
def setNormDebugLevel(self, level):
self.normInstance.setDebugLevel(level)
def setNormMessageTrace(self, state):
self.normSession.setMessageTrace(state)
def start(self, send, recv):
if (recv):
self.normSession.startReceiver(10*1024*1024)
if (send):
if (self.norm_acking):
self.normSession.setFlowControl(0.0)
instanceId = random.randint(0, 0xffff)
self.normSession.startSender(instanceId, 10*1024*1024, 1400, 16, 4);
def stop(self):
del self.normInstance
self.normInstance = None
def sendMessage(self, msgBuf):
while not self.enqueueMessageObject(msgBuf):
continue
def enqueueMessageObject(self, msgBuf):
self.normTxReady.acquire() ; with self.normTxLock:
obj = self.normSession.dataEnqueue(msgBuf)
if obj is None:
self.norm_tx_vacancy = False ; return False
self.tx_msg_cache[obj] = msgBuf
if (self.norm_acking):
self.norm_tx_queue_count += 1
if not self.norm_tx_watermark_pending:
if self.norm_tx_queue_count >= self.norm_tx_queue_max/2:
self.normSession.setWatermark(obj)
self.norm_tx_watermark_pending = True
if self.norm_tx_queue_count >= self.norm_tx_queue_max:
return True
self.normTxReady.release()
return True
def onNormTxObjectPurged(self, obj):
with self.normTxLock:
if pynorm.NORM_OBJECT_DATA == obj.getType():
del self.tx_msg_cache[obj]
def onNormTxQueueVacancy(self):
with self.normTxLock:
wasTxReady = self.norm_tx_vacancy
if wasTxReady and self.norm_acking:
wasTxReady = self.norm_tx_queue_count < self.norm_tx_queue_max
self.norm_tx_vacancy = True
if self.norm_acking:
isTxReady = self.norm_tx_queue_count < self.norm_tx_queue_max
else:
isTxReady = False
if isTxReady and not wasTxReady:
if self.normTxReady.acquire(False):
sys.stderr.write("normMsgr onNormTxQueueVacancy() warning: normTxReady wasn't locked?!\n")
self.normTxReady.release()
def onNormTxWatermarkCompleted(self):
with self.normTxLock:
wasTxReady = self.norm_tx_vacancy
if wasTxReady and self.norm_acking:
wasTxReady = self.norm_tx_queue_count < self.norm_tx_queue_max
self.norm_tx_watermark_pending = False
self.norm_tx_queue_count -= self.norm_tx_queue_max / 2
isTxReady = self.norm_tx_vacancy
if isTxReady and self.norm_acking:
isTxReady = self.norm_tx_queue_count < self.norm_tx_queue_max
else:
isTxReady = False
if isTxReady and not wasTxReady:
if self.normTxReady.acquire(False):
sys.stderr.write("normMsgr onNormTxWatermarkCompleted() warning: normTxReady wasn't locked?!\n")
self.normTxReady.release()
def onNormRxObjectCompleted(self, obj):
with self.normRxLock:
if pynorm.ObjectType.DATA == obj.getType():
if 0 != len(self.output_msg_queue):
wasEmpty = False
else:
wasEmpty = True
msg = obj.getData()
self.output_msg_queue.append(msg)
if wasEmpty:
self.normRxReady.release() ;
def getRxMsg(self):
self.normRxReady.acquire() ; with self.normRxLock:
msg = self.output_msg_queue.popleft()
if 0 != len(self.output_msg_queue):
self.normRxReady.release() ; return msg
def getNextNormEvent(self):
if self.normInstance is None:
return None
else:
return self.normInstance.getNextEvent()
class NormEventHandler(Thread):
def __init__(self, parentMsgr, *args, **kwargs):
super(NormEventHandler, self).__init__(*args, **kwargs)
self.daemon = True
self.lock = Lock()
self.msgr = parentMsgr
def run(self):
self.lock.acquire()
while True:
try:
event = self.msgr.getNextNormEvent()
except:
sys.stderr.write("get next event exception\n");
self.lock.release()
return
if event is None:
break
if pynorm.EventType.EVENT_INVALID == event.type:
continue
elif pynorm.EventType.TX_QUEUE_EMPTY == event.type or pynorm.EventType.TX_QUEUE_VACANCY == event.type:
msgr.onNormTxQueueVacancy()
elif pynorm.EventType.TX_WATERMARK_COMPLETED == event.type:
if pynorm.EventType.ACK_SUCCESS == event.session.getAckingStatus():
msgr.onNormTxWatermarkCompleted()
else:
event.session.resetWatermark()
elif pynorm.EventType.TX_OBJECT_PURGED == event.type:
msgr.onNormTxObjectPurged(event.object)
elif pynorm.EventType.RX_OBJECT_COMPLETED == event.type:
msgr.onNormRxObjectCompleted(event.object)
sys.stderr.write("normMsgr: NormEventHandler thread exiting ...\n");
self.lock.release()
def usage():
sys.stderr.write("Usage: normMsgr.py id <nodeId> {send &| recv} [addr <addr>[/<port>]][ack <node1>[,<node2>,...]\n" +
" [cc|cce|ccl|rate <bitsPerSecond>][interface <name>][debug <level>][trace]\n")
nodeId = None
sessionAddr = "224.1.2.3"
sessionPort = 6003
send = False
recv = False
ccMode = "cc"
txRate = None
ackerList = []
debugLevel = 3
normTrace = False
mcastIface = None
cmd = None
val = None
try:
i = 1
while i < len(sys.argv):
cmd = sys.argv[i]
i += 1
if "id" == cmd:
val = sys.argv[i]
nodeId = int(val)
i += 1
elif "addr" == cmd:
val = sys.argv[i]
i += 1
if "/" in val:
field = val.split('/')
sessionAddr = field[0]
sessionPort = int(field[1])
else:
sessionAddr = val
elif "send" == cmd:
send = True
elif "recv" == cmd:
recv = True
elif "cc" == cmd:
ccMode = "cc"
elif "cce" == cmd:
ccMode = "cce"
elif "ccl" == cmd:
ccMode = "ccl"
elif "rate" == cmd:
val = sys.argv[i]
rxRate = float(val)
ccMode = "fixed"
i += 1
elif "ack" == cmd:
alist = sys.argv[i].split(',')
for val in alist:
ackerList.append(int(val))
elif "debug" == cmd:
val = sys.argv[i]
debugLevel = int(val)
i += 1
elif "trace" == cmd:
normTrace = True
else:
sys.stderr.write("normMsgr error: invalid command \"%s\"\n" % cmd)
except Exception as e:
sys.stderr.write("normMsgr \"" + cmd + " " + val + "\" argument error: " + e.__str__() + "\n")
usage()
sys.exit(-1)
if not send and not recv:
sys.stderr.write("normMsgr error: not configured to send or recv!\n")
usage()
sys.exit(-1)
if nodeId is None:
sys.stderr.write("normMsgr error: no local 'id' provided!\n")
usage()
sys.exit(-1)
msgr = NormMsgr()
msgr.setNormDebugLevel(debugLevel)
sys.stderr.write("normMsgr: opening norm session ...\n")
msgr.openNormSession(sessionAddr, sessionPort, nodeId)
if mcastIface:
msgr.setNormMulticastInterface(mcastIface)
for node in ackerList:
msgr.addAckingNode(node)
msgr.setNormCCMode(ccMode);
if "fixed" == ccMode:
msgr.setNormTxRate(txRate)
msgr.setNormMessageTrace(normTrace)
msgr.start(send, recv)
sys.stderr.write("normMsgr: starting NormEventHandler ...\n")
normEventHandler = NormEventHandler(msgr)
normEventHandler.start()
if send:
sys.stderr.write("normMsgr: starting input thread ...\n")
inputThread = InputThread(msgr)
inputThread.start()
if recv:
sys.stderr.write("normMsgr: starting output thread ...\n")
outputThread = OutputThread(msgr)
outputThread.start()
try:
sys.stderr.write("normMsgr: running (use Crtl-C to exit) ...\n")
while True:
time.sleep(5)
except KeyboardInterrupt:
pass
if send:
inputThread.join()
if recv:
outputThread.join()
sys.stderr.write("normMsgr: Done.\n")