__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import os
from concurrent.futures import _base
import queue
import multiprocessing as mp
import multiprocessing.connection
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
import itertools
import sys
from traceback import format_exception
_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
class _ThreadWakeup:
def __init__(self):
self._closed = False
self._lock = threading.Lock()
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
with self._lock:
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()
def wakeup(self):
with self._lock:
if not self._closed:
self._writer.send_bytes(b"")
def clear(self):
if self._closed:
raise RuntimeError('operation on closed _ThreadWakeup')
while self._reader.poll():
self._reader.recv_bytes()
def _python_exit():
global _global_shutdown
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
thread_wakeup.wakeup()
for t, _ in items:
t.join()
threading._register_atexit(_python_exit)
EXTRA_QUEUED_CALLS = 1
_MAX_WINDOWS_WORKERS = 63 - 2
class _RemoteTraceback(Exception):
def __init__(self, tb):
self.tb = tb
def __str__(self):
return self.tb
class _ExceptionWithTraceback:
def __init__(self, exc, tb):
tb = ''.join(format_exception(type(exc), exc, tb))
self.exc = exc
self.exc.__traceback__ = None
self.tb = '\n"""\n%s"""' % tb
def __reduce__(self):
return _rebuild_exc, (self.exc, self.tb)
def _rebuild_exc(exc, tb):
exc.__cause__ = _RemoteTraceback(tb)
return exc
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs
class _SafeQueue(Queue):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
if work_item is not None:
work_item.future.set_exception(e)
else:
super()._on_queue_feeder_error(e, obj)
def _process_chunk(fn, chunk):
return [fn(*args) for args in chunk]
def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception, exit_pid=exit_pid))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
return
num_tasks = 0
exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
result_queue.put(os.getpid())
return
if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
else:
_sendback_result(result_queue, call_item.work_id, result=r,
exit_pid=exit_pid)
del r
del call_item
if exit_pid is not None:
return
class _ExecutorManagerThread(threading.Thread):
def __init__(self, executor):
self.thread_wakeup = executor._executor_manager_thread_wakeup
self.shutdown_lock = executor._shutdown_lock
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
mp_util_debug=mp.util.debug):
mp_util_debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
self.executor_reference = weakref.ref(executor, weakref_cb)
self.processes = executor._processes
self.call_queue = executor._call_queue
self.result_queue = executor._result_queue
self.work_ids_queue = executor._work_ids
self.max_tasks_per_child = executor._max_tasks_per_child
self.pending_work_items = executor._pending_work_items
super().__init__()
def run(self):
while True:
try:
self.add_call_item_to_queue()
except BaseException as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
if is_broken:
self.terminate_broken(cause)
return
if result_item is not None:
self.process_result_item(result_item)
process_exited = result_item.exit_pid is not None
if process_exited:
p = self.processes.pop(result_item.exit_pid)
p.join()
del result_item
if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._adjust_process_count()
else:
executor._idle_worker_semaphore.release()
del executor
if self.is_shutting_down():
self.flag_executor_shutting_down()
self.add_call_item_to_queue()
if not self.pending_work_items:
self.join_executor_internals()
return
def add_call_item_to_queue(self):
while True:
if self.call_queue.full():
return
try:
work_id = self.work_ids_queue.get(block=False)
except queue.Empty:
return
else:
work_item = self.pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
self.call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del self.pending_work_items[work_id]
continue
def wait_result_broken_or_wakeup(self):
result_reader = self.result_queue._reader
assert not self.thread_wakeup._closed
wakeup_reader = self.thread_wakeup._reader
readers = [result_reader, wakeup_reader]
worker_sentinels = [p.sentinel for p in list(self.processes.values())]
ready = mp.connection.wait(readers + worker_sentinels)
cause = None
is_broken = True
result_item = None
if result_reader in ready:
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as exc:
cause = format_exception(exc)
elif wakeup_reader in ready:
is_broken = False
self.thread_wakeup.clear()
return result_item, is_broken, cause
def process_result_item(self, result_item):
work_item = self.pending_work_items.pop(result_item.work_id, None)
if work_item is not None:
if result_item.exception is not None:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
def is_shutting_down(self):
executor = self.executor_reference()
return (_global_shutdown or executor is None
or executor._shutdown_thread)
def _terminate_broken(self, cause):
executor = self.executor_reference()
if executor is not None:
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
if cause is not None:
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
for work_id, work_item in self.pending_work_items.items():
try:
work_item.future.set_exception(bpe)
except _base.InvalidStateError:
pass
del work_item
self.pending_work_items.clear()
for p in self.processes.values():
p.terminate()
self.call_queue._terminate_broken()
self._join_executor_internals(broken=True)
def terminate_broken(self, cause):
with self.shutdown_lock:
self._terminate_broken(cause)
def flag_executor_shutting_down(self):
executor = self.executor_reference()
if executor is not None:
executor._shutdown_thread = True
if executor._cancel_pending_futures:
new_pending_work_items = {}
for work_id, work_item in self.pending_work_items.items():
if not work_item.future.cancel():
new_pending_work_items[work_id] = work_item
self.pending_work_items = new_pending_work_items
while True:
try:
self.work_ids_queue.get_nowait()
except queue.Empty:
break
executor._cancel_pending_futures = False
def shutdown_workers(self):
n_children_to_stop = self.get_n_children_alive()
n_sentinels_sent = 0
while (n_sentinels_sent < n_children_to_stop
and self.get_n_children_alive() > 0):
for i in range(n_children_to_stop - n_sentinels_sent):
try:
self.call_queue.put_nowait(None)
n_sentinels_sent += 1
except queue.Full:
break
def join_executor_internals(self):
with self.shutdown_lock:
self._join_executor_internals()
def _join_executor_internals(self, broken=False):
if not broken:
self.shutdown_workers()
self.call_queue.close()
self.call_queue.join_thread()
self.thread_wakeup.close()
for p in self.processes.values():
if broken:
p.terminate()
p.join()
def get_n_children_alive(self):
return sum(p.is_alive() for p in self.processes.values())
_system_limits_checked = False
_system_limited = None
def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
if _system_limited:
raise NotImplementedError(_system_limited)
_system_limits_checked = True
try:
import multiprocessing.synchronize
except ImportError:
_system_limited = (
"This Python build lacks multiprocessing.synchronize, usually due "
"to named semaphores being unavailable on this platform."
)
raise NotImplementedError(_system_limited)
try:
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
except (AttributeError, ValueError):
return
if nsems_max == -1:
return
if nsems_max >= 256:
return
_system_limited = ("system provides too few semaphores (%d"
" available, 256 necessary)" % nsems_max)
raise NotImplementedError(_system_limited)
def _chain_from_iterable_of_lists(iterable):
for element in iterable:
element.reverse()
while element:
yield element.pop()
class BrokenProcessPool(_base.BrokenExecutor):
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
_check_system_limits()
if max_workers is None:
self._max_workers = os.process_cpu_count() or 1
if sys.platform == 'win32':
self._max_workers = min(_MAX_WINDOWS_WORKERS,
self._max_workers)
else:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
elif (sys.platform == 'win32' and
max_workers > _MAX_WINDOWS_WORKERS):
raise ValueError(
f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
self._max_workers = max_workers
if mp_context is None:
if max_tasks_per_child is not None:
mp_context = mp.get_context("spawn")
else:
mp_context = mp.get_context()
self._mp_context = mp_context
self._safe_to_dynamically_spawn_children = (
self._mp_context.get_start_method(allow_none=False) != "fork")
if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
self._initargs = initargs
if max_tasks_per_child is not None:
if not isinstance(max_tasks_per_child, int):
raise TypeError("max_tasks_per_child must be an integer")
elif max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child must be >= 1")
if self._mp_context.get_start_method(allow_none=False) == "fork":
raise ValueError("max_tasks_per_child is incompatible with"
" the 'fork' multiprocessing start method;"
" supply a different mp_context.")
self._max_tasks_per_child = max_tasks_per_child
self._executor_manager_thread = None
self._processes = {}
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
self._cancel_pending_futures = False
self._executor_manager_thread_wakeup = _ThreadWakeup()
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
thread_wakeup=self._executor_manager_thread_wakeup)
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
if not self._safe_to_dynamically_spawn_children: self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
if self._processes is None:
return
if self._idle_worker_semaphore.acquire(blocking=False):
return
process_count = len(self._processes)
if process_count < self._max_workers:
self._spawn_process()
def _launch_processes(self):
assert not self._executor_manager_thread, (
'Processes cannot be fork()ed after the thread has started, '
'deadlock in the child processes could result.')
for _ in range(len(self._processes), self._max_workers):
self._spawn_process()
def _spawn_process(self):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs,
self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
self._executor_manager_thread_wakeup.wakeup()
if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def map(self, fn, *iterables, timeout=None, chunksize=1):
if chunksize < 1:
raise ValueError("chunksize must be >= 1.")
results = super().map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
return _chain_from_iterable_of_lists(results)
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
if self._executor_manager_thread_wakeup is not None:
self._executor_manager_thread_wakeup.wakeup()
if self._executor_manager_thread is not None and wait:
self._executor_manager_thread.join()
self._executor_manager_thread = None
self._call_queue = None
if self._result_queue is not None and wait:
self._result_queue.close()
self._result_queue = None
self._processes = None
self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__