from cec import cec
from sys import version_info
class pyCecClient:
cecconfig = cec.libcec_configuration()
lib = {}
log_level = cec.CEC_LOG_TRAFFIC
def SetConfiguration(self):
self.cecconfig.strDeviceName = "pyLibCec"
self.cecconfig.bActivateSource = 0
self.cecconfig.deviceTypes.Add(cec.CEC_DEVICE_TYPE_RECORDING_DEVICE)
self.cecconfig.clientVersion = cec.LIBCEC_VERSION_CURRENT
def SetLogCallback(self, callback):
self.cecconfig.SetLogCallback(callback)
def SetKeyPressCallback(self, callback):
self.cecconfig.SetKeyPressCallback(callback)
def SetCommandCallback(self, callback):
self.cecconfig.SetCommandCallback(callback)
def DetectAdapter(self):
retval = None
adapters = self.lib.DetectAdapters()
for adapter in adapters:
print("found a CEC adapter:")
print("port: " + adapter.strComName)
print("vendor: " + hex(adapter.iVendorId))
print("product: " + hex(adapter.iProductId))
retval = adapter.strComName
return retval
def InitLibCec(self):
self.lib = cec.ICECAdapter.Create(self.cecconfig)
print("libCEC version " + self.lib.VersionToString(self.cecconfig.serverVersion) + " loaded: " + self.lib.GetLibInfo())
adapter = self.DetectAdapter()
if adapter == None:
print("No adapters found")
else:
if self.lib.Open(adapter):
print("connection opened")
self.MainLoop()
else:
print("failed to open a connection to the CEC adapter")
def ProcessCommandSelf(self):
addresses = self.lib.GetLogicalAddresses()
strOut = "Addresses controlled by libCEC: "
x = 0
notFirst = False
while x < 15:
if addresses.IsSet(x):
if notFirst:
strOut += ", "
strOut += self.lib.LogicalAddressToString(x)
if self.lib.IsActiveSource(x):
strOut += " (*)"
notFirst = True
x += 1
print(strOut)
def ProcessCommandActiveSource(self):
self.lib.SetActiveSource()
def ProcessCommandStandby(self):
self.lib.StandbyDevices(cec.CECDEVICE_BROADCAST)
def ProcessCommandTx(self, data):
cmd = self.lib.CommandFromString(data)
print("transmit " + data)
if self.lib.Transmit(cmd):
print("command sent")
else:
print("failed to send command")
def ProcessCommandScan(self):
print("requesting CEC bus information ...")
strLog = "CEC bus information\n===================\n"
addresses = self.lib.GetActiveDevices()
activeSource = self.lib.GetActiveSource()
x = 0
while x < 15:
if addresses.IsSet(x):
vendorId = self.lib.GetDeviceVendorId(x)
physicalAddress = self.lib.GetDevicePhysicalAddress(x)
active = self.lib.IsActiveSource(x)
cecVersion = self.lib.GetDeviceCecVersion(x)
power = self.lib.GetDevicePowerStatus(x)
osdName = self.lib.GetDeviceOSDName(x)
strLog += "device #" + str(x) +": " + self.lib.LogicalAddressToString(x) + "\n"
strLog += "address: " + str(physicalAddress) + "\n"
strLog += "active source: " + str(active) + "\n"
strLog += "vendor: " + self.lib.VendorIdToString(vendorId) + "\n"
strLog += "CEC version: " + self.lib.CecVersionToString(cecVersion) + "\n"
strLog += "OSD name: " + osdName + "\n"
strLog += "power status: " + self.lib.PowerStatusToString(power) + "\n\n\n"
x += 1
print(strLog)
def ReadInput(self, prompt):
if version_info >= (3,0,0):
return input(prompt)
else:
return raw_input(prompt)
def MainLoop(self):
runLoop = True
while runLoop:
command = self.ReadInput("Enter command:").lower()
if command == 'q' or command == 'quit':
runLoop = False
elif command == 'self':
self.ProcessCommandSelf()
elif command == 'as' or command == 'activesource':
self.ProcessCommandActiveSource()
elif command == 'standby':
self.ProcessCommandStandby()
elif command == 'scan':
self.ProcessCommandScan()
elif command[:2] == 'tx':
self.ProcessCommandTx(command[3:])
print('Exiting...')
def LogCallback(self, level, time, message):
if level > self.log_level:
return 0
if level == cec.CEC_LOG_ERROR:
levelstr = "ERROR: "
elif level == cec.CEC_LOG_WARNING:
levelstr = "WARNING: "
elif level == cec.CEC_LOG_NOTICE:
levelstr = "NOTICE: "
elif level == cec.CEC_LOG_TRAFFIC:
levelstr = "TRAFFIC: "
elif level == cec.CEC_LOG_DEBUG:
levelstr = "DEBUG: "
print(levelstr + "[" + str(time) + "] " + message)
return 0
def KeyPressCallback(self, key, duration):
print("[key pressed] " + str(key))
return 0
def CommandCallback(self, cmd):
print("[command received] " + cmd)
return 0
def __init__(self):
self.SetConfiguration()
def log_callback(level, time, message):
return lib.LogCallback(level, time, message)
def key_press_callback(key, duration):
return lib.KeyPressCallback(key, duration)
def command_callback(cmd):
return lib.CommandCallback(cmd)
if __name__ == '__main__':
lib = pyCecClient()
lib.SetLogCallback(log_callback)
lib.SetKeyPressCallback(key_press_callback)
lib.SetCommandCallback(command_callback)
lib.InitLibCec()