import importlib
import os
import pickle
import sys
import _thread
import traceback
import socket
import logging
LOG = logging.getLogger(__name__)
PORT = 12032
safe_modules = {
'numpy',
'numpy.core.multiarray',
}
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module in safe_modules:
return getattr(importlib.import_module(module), name)
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))
class FileSock:
" wraps a socket so that it is usable by pickle/cPickle "
def __init__(self,sock):
self.sock = sock
self.nr=0
def write(self, buf):
bs = 512 * 1024
ns = 0
while ns < len(buf):
sent = self.sock.send(buf[ns:ns + bs])
ns += sent
def read(self,bs=512*1024):
self.nr+=1
b = []
nb = 0
while len(b)<bs:
rb = self.sock.recv(bs - nb)
if not rb: break
b.append(rb)
nb += len(rb)
return b''.join(b)
def readline(self):
s=bytes()
while True:
c=self.read(1)
s+=c
if len(c)==0 or chr(c[0])=='\n':
return s
class ClientExit(Exception):
pass
class ServerException(Exception):
pass
class Server:
def __init__(self, s, logf=sys.stderr, log_prefix=''):
self.logf = logf
self.log_prefix = log_prefix
self.conn = s
self.fs = FileSock(s)
def log(self, s):
self.logf.write("Sever log %s: %s\n" % (self.log_prefix, s))
def one_function(self):
try:
(fname, args) = RestrictedUnpickler(self.fs).load()
except EOFError:
raise ClientExit("read args")
self.log("executing method %s"%(fname))
st = None
ret = None
try:
f=getattr(self,fname)
except AttributeError:
st = AttributeError("unknown method "+fname)
self.log("unknown method")
try:
ret = f(*args)
except Exception as e:
st="".join(traceback.format_tb(sys.exc_info()[2]))+str(e)
self.log("exception in method")
traceback.print_exc(50,self.logf)
self.logf.flush()
LOG.info("return")
try:
pickle.dump((st ,ret), self.fs, protocol=4)
except EOFError:
raise ClientExit("function return")
def exec_loop(self):
self.log("in exec_loop")
try:
while True:
self.one_function()
except ClientExit as e:
self.log("ClientExit %s"%e)
except socket.error as e:
self.log("socket error %s"%e)
traceback.print_exc(50,self.logf)
except EOFError:
self.log("EOF during communication")
traceback.print_exc(50,self.logf)
except BaseException:
traceback.print_exc(50,sys.stderr)
sys.exit(1)
LOG.info("exit sever")
def exec_loop_cleanup(self):
pass
def get_ps_stats(self):
ret=''
f=os.popen("echo ============ `hostname` uptime:; uptime;"+
"echo ============ self:; "+
"ps -p %d -o pid,vsize,rss,%%cpu,nlwp,psr; "%os.getpid()+
"echo ============ run queue:;"+
"ps ar -o user,pid,%cpu,%mem,ni,nlwp,psr,vsz,rss,cputime,command")
for l in f:
ret+=l
return ret
class Client:
def __init__(self, HOST, port=PORT, v6=False):
socktype = socket.AF_INET6 if v6 else socket.AF_INET
sock = socket.socket(socktype, socket.SOCK_STREAM)
LOG.info("connecting to %s:%d, socket type: %s", HOST, port, socktype)
sock.connect((HOST, port))
self.sock = sock
self.fs = FileSock(sock)
def generic_fun(self, fname, args):
pickle.dump((fname, args), self.fs, protocol=4)
return self.get_result()
def get_result(self):
(st, ret) = RestrictedUnpickler(self.fs).load()
if st!=None:
raise ServerException(st)
else:
return ret
def __getattr__(self,name):
return lambda *x: self.generic_fun(name,x)
def run_server(new_handler, port=PORT, report_to_file=None, v6=False):
HOST = '' socktype = socket.AF_INET6 if v6 else socket.AF_INET
s = socket.socket(socktype, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
LOG.info("bind %s:%d", HOST, port)
s.bind((HOST, port))
s.listen(5)
LOG.info("accepting connections")
if report_to_file is not None:
LOG.info('storing host+port in %s', report_to_file)
open(report_to_file, 'w').write('%s:%d ' % (socket.gethostname(), port))
while True:
try:
conn, addr = s.accept()
except socket.error as e:
if e[1]=='Interrupted system call': continue
raise
LOG.info('Connected to %s', addr)
ibs = new_handler(conn)
tid = _thread.start_new_thread(ibs.exec_loop,())
LOG.debug("Thread ID: %d", tid)