__all__ = [
"error",
"start_new_thread",
"exit",
"get_ident",
"allocate_lock",
"interrupt_main",
"LockType",
"RLock",
"_count",
"start_joinable_thread",
"daemon_threads_allowed",
"_shutdown",
"_make_thread_handle",
"_ThreadHandle",
"_get_main_thread_ident",
"_is_main_interpreter",
"_local",
]
TIMEOUT_MAX = 2**31
_MAIN_THREAD_IDENT = -1
error = RuntimeError
def start_new_thread(function, args, kwargs={}):
if type(args) != type(tuple()):
raise TypeError("2nd arg must be a tuple")
if type(kwargs) != type(dict()):
raise TypeError("3rd arg must be a dict")
global _main
_main = False
try:
function(*args, **kwargs)
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
_main = True
global _interrupt
if _interrupt:
_interrupt = False
raise KeyboardInterrupt
def start_joinable_thread(function, handle=None, daemon=True):
if handle is None:
handle = _ThreadHandle()
try:
function()
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
handle._set_done()
return handle
def daemon_threads_allowed():
return True
def _shutdown():
pass
def _make_thread_handle(ident):
handle = _ThreadHandle()
handle._ident = ident
return handle
def _get_main_thread_ident():
return _MAIN_THREAD_IDENT
def _is_main_interpreter():
return True
def exit():
raise SystemExit
def get_ident():
return _MAIN_THREAD_IDENT
def allocate_lock():
return LockType()
def stack_size(size=None):
if size is not None:
raise error("setting thread stack size not supported")
return 0
def _set_sentinel():
return LockType()
def _count():
return 0
class LockType(object):
def __init__(self):
self.locked_status = False
def acquire(self, waitflag=None, timeout=-1):
if waitflag is None or waitflag:
self.locked_status = True
return True
else:
if not self.locked_status:
self.locked_status = True
return True
else:
if timeout > 0:
import time
time.sleep(timeout)
return False
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
if not self.locked_status:
raise error
self.locked_status = False
return True
def locked(self):
return self.locked_status
def _at_fork_reinit(self):
self.locked_status = False
def __repr__(self):
return "<%s %s.%s object at %s>" % (
"locked" if self.locked_status else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
hex(id(self)),
)
class _ThreadHandle:
def __init__(self):
self._ident = _MAIN_THREAD_IDENT
self._done = False
@property
def ident(self):
return self._ident
def _set_done(self):
self._done = True
def is_done(self):
return self._done
def join(self, timeout=None):
return
def __repr__(self):
return f"<_ThreadHandle ident={self._ident}>"
_interrupt = False
_main = True
def interrupt_main():
if _main:
raise KeyboardInterrupt
else:
global _interrupt
_interrupt = True
class RLock:
def __init__(self):
self.locked_count = 0
def acquire(self, waitflag=None, timeout=-1):
self.locked_count += 1
return True
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
if not self.locked_count:
raise error
self.locked_count -= 1
return True
def locked(self):
return self.locked_count != 0
def __repr__(self):
return "<%s %s.%s object owner=%s count=%s at %s>" % (
"locked" if self.locked_count else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
get_ident() if self.locked_count else 0,
self.locked_count,
hex(id(self)),
)
class _local:
def __init__(self):
object.__setattr__(self, "_local__impl", {})
def __getattribute__(self, name):
if name.startswith("_local__"):
return object.__getattribute__(self, name)
impl = object.__getattribute__(self, "_local__impl")
try:
return impl[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
if name.startswith("_local__"):
return object.__setattr__(self, name, value)
impl = object.__getattribute__(self, "_local__impl")
impl[name] = value
def __delattr__(self, name):
if name.startswith("_local__"):
return object.__delattr__(self, name)
impl = object.__getattribute__(self, "_local__impl")
try:
del impl[name]
except KeyError:
raise AttributeError(name)