o"""
pynorm - Python wrapper for NRL's libnorm
By: Tom Wambold <wambold@itd.nrl.navy.mil>
"""
from __future__ import absolute_import
import ctypes
import locale
from typing import Optional
import pynorm.constants as c
from pynorm.core import libnorm, NormError
from pynorm.object import Object
class Session(object):
locale_encoding:str = locale.getpreferredencoding()
def __init__(self, instance, address:str, port:int, localId=c.NORM_NODE_ANY):
self._instance = instance
self._session:int = libnorm.NormCreateSession(instance, address.encode('utf-8'), port, localId)
self.sendGracefulStop = False
self.gracePeriod = 0
def destroy(self):
libnorm.NormDestroySession(self)
del self._instance._sessions[self]
def setUserData(self, data:str):
libnorm.NormSetUserData(self, data.encode('utf-8'))
def getUserData(self)->str:
data = libnorm.NormGetUserData(self)
return data.decode('utf-8') if data else None
def getNodeId(self):
return libnorm.NormGetLocalNodeId(self)
def setTxPort(self, txPort:int, enableReuse:bool=False, txBindAddr:Optional[str]=None):
libnorm.NormSetTxPort(self, txPort, enableReuse,
txBindAddr.encode('utf-8') if txBindAddr else None )
def setTxOnly(self, txOnly:bool=False, connectToSessionAddress:bool=False):
libnorm.NormSetTxOnly(self, txOnly, connectToSessionAddress)
def setRxPortReuse(self, enable:bool, rxBindAddr:Optional[str]=None, senderAddr:Optional[str]=None, senderPort:int=0):
libnorm.NormSetRxPortReuse(self, enable,
rxBindAddr.encode('utf-8') if rxBindAddr else None,
senderAddr.encode('utf-8') if senderAddr else None,
senderPort)
def setMulticastInterface(self, iface:str):
libnorm.NormSetMulticastInterface(self, iface.encode('utf-8'))
def setSSM(self, srcAddr:str):
libnorm.NormSetSSM(self, srcAddr.encode('utf-8'))
def setTTL(self, ttl) -> bool:
return libnorm.NormSetTTL(self, ttl)
def setTOS(self, tos) -> bool:
return libnorm.NormSetTOS(self, tos)
def setLoopback(self, loopbackEnable:bool=False):
libnorm.NormSetLoopback(self, loopbackEnable)
def setMulticastLoopback(self, enable:bool):
libnorm.NormSetMulticastLoopback(self, enable)
def setTxLoss(self, percent:float):
libnorm.NormSetTxLoss(percent)
def setRxLoss(self, percent:float):
libnorm.NormSetRxLoss(percent)
def getReportInterval(self) -> int:
return libnorm.NormGetReportInterval(self)
def setReportInterval(self, interval:int):
libnorm.NormSetReportInterval(self, interval)
def startSender(self, sessionId:int, bufferSpace:int, segmentSize:int, blockSize:int, numParity:int, fecId=0) -> bool:
return libnorm.NormStartSender(self, sessionId, bufferSpace, segmentSize, blockSize, numParity, fecId)
def stopSender(self):
libnorm.NormStopSender(self)
def setTxRate(self, txRate:float):
libnorm.NormSetTxRate(self, txRate)
def getTxRate(self) -> float:
return libnorm.NormGetTxRate(self)
def setTxSocketBuffer(self, size:int):
libnorm.NormSetTxSocketBuffer(self, size)
def setCongestionControl(self, ccEnable:bool, adjustRate:bool=True):
libnorm.NormSetCongestionControl(self, ccEnable, adjustRate)
def setEcnSupport(self, ecnEnable, ignoreLoss=False, tolerateLoss=False):
libnorm.NormSetEcnSupport(self, ecnEnable, ignoreLoss, tolerateLoss)
def setFlowControl(self, flowControlFactor):
libnorm.NormSetFlowControl(self, flowControlFactor)
def setTxRateBounds(self, rateMin, rateMax) -> bool:
if rateMin > rateMax:
return False
libnorm.NormSetTxRateBounds(self, rateMin, rateMax)
return True
def setTxCacheBounds(self, sizeMax:int, countMin:int, countMax:int):
libnorm.NormSetTxCacheBounds(self, sizeMax, countMin, countMax)
def setAutoParity(self, parity:int):
libnorm.NormSetAutoParity(self, parity)
def getGrttEstimate(self) -> float:
return libnorm.NormGetGrttEstimate(self)
def setGrttEstimate(self, grtt:float):
return libnorm.NormSetGrttEstimate(self, grtt)
def setGrttMax(self, grttMax:float):
return libnorm.NormSetGrttMax(self, grttMax)
def setGrttProbingMode(self, probingMode:c.ProbingMode):
libnorm.NormSetGrttProbingMode(self, probingMode.value)
def setGrttProbingInterval(self, intervalMin:int, intervalMax:int):
libnorm.NormSetGrttProbingInterval(self, intervalMin, intervalMax)
def setBackoffFactor(self, factor:int):
libnorm.NormSetBackoffFactor(self, factor)
def setGroupSize(self, size:int):
libnorm.NormSetGroupSize(self, size)
def setTxRobustFactor(self, robustFactor:int):
libnorm.NormSetTxRobustFactor(self, robustFactor)
def fileEnqueue(self, filename:str, info:bytes=b""):
result = libnorm.NormFileEnqueue(self, filename.encode(self.locale_encoding), info, len(info))
if ctypes.c_void_p.in_dll(libnorm, "NORM_OBJECT_INVALID") == result:
return None; else:
obj = self._instance._objects[result] = Object(result)
return obj
def dataEnqueue(self, data:bytes, info:bytes=b""):
result = libnorm.NormDataEnqueue(self, data, len(data), info, len(info))
if ctypes.c_void_p.in_dll(libnorm, "NORM_OBJECT_INVALID") == result:
return None; else:
obj = self._instance._objects[result] = Object(result)
return obj
def streamOpen(self, bufferSize:int, info=b""):
result = libnorm.NormStreamOpen(self, bufferSize, info, len(info))
if ctypes.c_void_p.in_dll(libnorm, "NORM_OBJECT_INVALID") == result:
return None; else:
obj = self._instance._objects[result] = Object(result)
return obj
def requeueObject(self, normObject):
libnorm.NormRequeueObject(self, normObject)
def sendCommand(self, cmdBuffer:bytes, robust:bool=False) -> bool:
return libnorm.NormSendCommand(self, cmdBuffer, len(cmdBuffer), robust)
def cancelCommand(self):
libnorm.NormCancelCommand(self)
def setWatermark(self, normObject:Object, overrideFlush=False) -> bool:
return libnorm.NormSetWatermark(self, normObject, overrideFlush)
def resetWatermark(self) -> bool:
return libnorm.NormResetWatermark(self)
def cancelWatermark(self) -> None:
libnorm.NormCancelWatermark(self)
def addAckingNode(self, nodeId:int) -> bool:
return libnorm.NormAddAckingNode(self, nodeId)
def removeAckingNode(self, nodeId:int) -> None:
libnorm.NormRemoveAckingNode(self, nodeId)
def setAutoAckingNodes(self, trackingStatus:c.TrackingStatus) -> None:
libnorm.NormSetAutoAckingNodes(self, trackingStatus.value)
def getNextAckingNode(self, nodeID:int=c.NORM_NODE_NONE) -> (bool, int,int):
ackingStatus = 0
nodeIdBytes = ctypes.c_uint(ctypes.sizeof(nodeId))
ackingStatusBytes = ctypes.c_uint(ctypes.sizeof(ackingStatus))
isSuccess = libnorm.NormGetNextAckingNode(self, ctypes.byref(nodeIdBytes),ctypes.byref(ackingStatusBytes) )
return (isSuccess, nodeIdBytes.value, c.AckingStatus(ackingStatusBytes.value) if isSuccess else c.AckingStatus.INVALID)
def getAckingStatus(self, nodeId:int=c.NORM_NODE_ANY) -> c.AckingStatus:
return c.AckingStatus( libnorm.NormGetAckingStatus(self, nodeId) )
def startReceiver(self, bufferSpace:int) -> bool:
return libnorm.NormStartReceiver(self, bufferSpace)
def stopReceiver(self) -> None:
libnorm.NormStopReceiver(self)
def setRxCacheLimit(self, count:int):
libnorm.NormSetRxCacheLimit(self, count)
def setRxSocketBuffer(self, size:int):
libnorm.NormSetRxSocketBuffer(self, size)
def setSilentReceiver(self, silent:bool, maxDelay:Optional[int]=None):
if maxDelay == None:
maxDelay = -1
libnorm.NormSetSilentReceiver(self, silent, maxDelay)
def setDefaultUnicastNack(self, enable:bool):
libnorm.NormSetDefaultUnicastNack(self, enable)
def setDefaultSyncPolicy(self, policy:c.SyncPolicy):
libnorm.NormSetDefaultSyncPolicy(self, policy.value)
def setDefaultNackingMode(self, mode:c.NackingMode):
libnorm.NormSetDefaultNackingMode(self, mode.value)
def setDefaultRepairBoundary(self, boundary:c.RepairBoundary):
libnorm.NormSetDefaultRepairBoundary(self, boundary.value)
def setDefaultRxRobustFactor(self, rxRobustFactor:int):
libnorm.NormSetDefaultRxRobustFactor(rxRobustFactor)
def setMessageTrace(self, state):
libnorm.NormSetMessageTrace(self, state)
nodeId:int = property(getNodeId)
grtt:float = property(getGrttEstimate, setGrttEstimate)
userData:str = property(getUserData, setUserData)
reportInterval:int = property(getReportInterval, setReportInterval)
def __del__(self):
self.stopReceiver()
self.stopSender()
libnorm.NormDestroySession(self)
@property
def _as_parameter_(self):
return self._session
def __cmp__(self, other):
def cmp(a, b):
return (a > b) - (a < b)
try:
return cmp(self._as_parameter_, other._as_parameter)
except AttributeError:
return cmp(self._as_parameter_, other)
def __hash__(self):
return self._as_parameter_