__all__ = (
'Queue',
'PriorityQueue',
'LifoQueue',
'QueueFull',
'QueueEmpty',
'QueueShutDown',
)
import collections
import heapq
from types import GenericAlias
from . import locks
from . import mixins
class QueueEmpty(Exception):
pass
class QueueFull(Exception):
pass
class QueueShutDown(Exception):
pass
class Queue(mixins._LoopBoundMixin):
def __init__(self, maxsize=0):
self._maxsize = maxsize
self._getters = collections.deque()
self._putters = collections.deque()
self._unfinished_tasks = 0
self._finished = locks.Event()
self._finished.set()
self._init(maxsize)
self._is_shutdown = False
def _init(self, maxsize):
self._queue = collections.deque()
def _get(self):
return self._queue.popleft()
def _put(self, item):
self._queue.append(item)
def _wakeup_next(self, waiters):
while waiters:
waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)
break
def __repr__(self):
return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
def __str__(self):
return f'<{type(self).__name__} {self._format()}>'
__class_getitem__ = classmethod(GenericAlias)
def _format(self):
result = f'maxsize={self._maxsize!r}'
if getattr(self, '_queue', None):
result += f' _queue={list(self._queue)!r}'
if self._getters:
result += f' _getters[{len(self._getters)}]'
if self._putters:
result += f' _putters[{len(self._putters)}]'
if self._unfinished_tasks:
result += f' tasks={self._unfinished_tasks}'
if self._is_shutdown:
result += ' shutdown'
return result
def qsize(self):
return len(self._queue)
@property
def maxsize(self):
return self._maxsize
def empty(self):
return not self._queue
def full(self):
if self._maxsize <= 0:
return False
else:
return self.qsize() >= self._maxsize
async def put(self, item):
while self.full():
if self._is_shutdown:
raise QueueShutDown
putter = self._get_loop().create_future()
self._putters.append(putter)
try:
await putter
except:
putter.cancel() try:
self._putters.remove(putter)
except ValueError:
pass
if not self.full() and not putter.cancelled():
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)
def put_nowait(self, item):
if self._is_shutdown:
raise QueueShutDown
if self.full():
raise QueueFull
self._put(item)
self._unfinished_tasks += 1
self._finished.clear()
self._wakeup_next(self._getters)
async def get(self):
while self.empty():
if self._is_shutdown and self.empty():
raise QueueShutDown
getter = self._get_loop().create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel() try:
self._getters.remove(getter)
except ValueError:
pass
if not self.empty() and not getter.cancelled():
self._wakeup_next(self._getters)
raise
return self.get_nowait()
def get_nowait(self):
if self.empty():
if self._is_shutdown:
raise QueueShutDown
raise QueueEmpty
item = self._get()
self._wakeup_next(self._putters)
return item
def task_done(self):
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()
async def join(self):
if self._unfinished_tasks > 0:
await self._finished.wait()
def shutdown(self, immediate=False):
self._is_shutdown = True
if immediate:
while not self.empty():
self._get()
if self._unfinished_tasks > 0:
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()
while self._getters:
getter = self._getters.popleft()
if not getter.done():
getter.set_result(None)
while self._putters:
putter = self._putters.popleft()
if not putter.done():
putter.set_result(None)
class PriorityQueue(Queue):
def _init(self, maxsize):
self._queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self._queue)
class LifoQueue(Queue):
def _init(self, maxsize):
self._queue = []
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.pop()