import collections
import collections.abc
import concurrent.futures
import errno
import heapq
import itertools
import os
import socket
import stat
import subprocess
import threading
import time
import traceback
import sys
import warnings
import weakref
try:
import ssl
except ImportError: ssl = None
from . import constants
from . import coroutines
from . import events
from . import exceptions
from . import futures
from . import protocols
from . import sslproto
from . import staggered
from . import tasks
from . import timeouts
from . import transports
from . import trsock
from .log import logger
__all__ = 'BaseEventLoop','Server',
_MIN_SCHEDULED_TIMER_HANDLES = 100
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
_HAS_IPv6 = hasattr(socket, 'AF_INET6')
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
def _format_handle(handle):
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), tasks.Task):
return repr(cb.__self__)
else:
return str(handle)
def _format_pipe(fd):
if fd == subprocess.PIPE:
return '<pipe>'
elif fd == subprocess.STDOUT:
return '<stdout>'
else:
return repr(fd)
def _set_reuseport(sock):
if not hasattr(socket, 'SO_REUSEPORT'):
raise ValueError('reuse_port not supported by socket module')
else:
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except OSError:
raise ValueError('reuse_port not supported by socket module, '
'SO_REUSEPORT defined but not implemented.')
def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
if not hasattr(socket, 'inet_pton'):
return
if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
host is None:
return None
if type == socket.SOCK_STREAM:
proto = socket.IPPROTO_TCP
elif type == socket.SOCK_DGRAM:
proto = socket.IPPROTO_UDP
else:
return None
if port is None:
port = 0
elif isinstance(port, bytes) and port == b'':
port = 0
elif isinstance(port, str) and port == '':
port = 0
else:
try:
port = int(port)
except (TypeError, ValueError):
return None
if family == socket.AF_UNSPEC:
afs = [socket.AF_INET]
if _HAS_IPv6:
afs.append(socket.AF_INET6)
else:
afs = [family]
if isinstance(host, bytes):
host = host.decode('idna')
if '%' in host:
return None
for af in afs:
try:
socket.inet_pton(af, host)
if _HAS_IPv6 and af == socket.AF_INET6:
return af, type, proto, '', (host, port, flowinfo, scopeid)
else:
return af, type, proto, '', (host, port)
except OSError:
pass
return None
def _interleave_addrinfos(addrinfos, first_address_family_count=1):
addrinfos_by_family = collections.OrderedDict()
for addr in addrinfos:
family = addr[0]
if family not in addrinfos_by_family:
addrinfos_by_family[family] = []
addrinfos_by_family[family].append(addr)
addrinfos_lists = list(addrinfos_by_family.values())
reordered = []
if first_address_family_count > 1:
reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
del addrinfos_lists[0][:first_address_family_count - 1]
reordered.extend(
a for a in itertools.chain.from_iterable(
itertools.zip_longest(*addrinfos_lists)
) if a is not None)
return reordered
def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
return
futures._get_loop(fut).stop()
if hasattr(socket, 'TCP_NODELAY'):
def _set_nodelay(sock):
if (sock.family in {socket.AF_INET, socket.AF_INET6} and
sock.type == socket.SOCK_STREAM and
sock.proto == socket.IPPROTO_TCP):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
else:
def _set_nodelay(sock):
pass
def _check_ssl_socket(sock):
if ssl is not None and isinstance(sock, ssl.SSLSocket):
raise TypeError("Socket cannot be of type SSLSocket")
class _SendfileFallbackProtocol(protocols.Protocol):
def __init__(self, transp):
if not isinstance(transp, transports._FlowControlMixin):
raise TypeError("transport should be _FlowControlMixin instance")
self._transport = transp
self._proto = transp.get_protocol()
self._should_resume_reading = transp.is_reading()
self._should_resume_writing = transp._protocol_paused
transp.pause_reading()
transp.set_protocol(self)
if self._should_resume_writing:
self._write_ready_fut = self._transport._loop.create_future()
else:
self._write_ready_fut = None
async def drain(self):
if self._transport.is_closing():
raise ConnectionError("Connection closed by peer")
fut = self._write_ready_fut
if fut is None:
return
await fut
def connection_made(self, transport):
raise RuntimeError("Invalid state: "
"connection should have been established already.")
def connection_lost(self, exc):
if self._write_ready_fut is not None:
if exc is None:
self._write_ready_fut.set_exception(
ConnectionError("Connection is closed by peer"))
else:
self._write_ready_fut.set_exception(exc)
self._proto.connection_lost(exc)
def pause_writing(self):
if self._write_ready_fut is not None:
return
self._write_ready_fut = self._transport._loop.create_future()
def resume_writing(self):
if self._write_ready_fut is None:
return
self._write_ready_fut.set_result(False)
self._write_ready_fut = None
def data_received(self, data):
raise RuntimeError("Invalid state: reading should be paused")
def eof_received(self):
raise RuntimeError("Invalid state: reading should be paused")
async def restore(self):
self._transport.set_protocol(self._proto)
if self._should_resume_reading:
self._transport.resume_reading()
if self._write_ready_fut is not None:
self._write_ready_fut.cancel()
if self._should_resume_writing:
self._proto.resume_writing()
class Server(events.AbstractServer):
def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout, ssl_shutdown_timeout=None):
self._loop = loop
self._sockets = sockets
self._clients = weakref.WeakSet()
self._waiters = []
self._protocol_factory = protocol_factory
self._backlog = backlog
self._ssl_context = ssl_context
self._ssl_handshake_timeout = ssl_handshake_timeout
self._ssl_shutdown_timeout = ssl_shutdown_timeout
self._serving = False
self._serving_forever_fut = None
def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
def _attach(self, transport):
assert self._sockets is not None
self._clients.add(transport)
def _detach(self, transport):
self._clients.discard(transport)
if len(self._clients) == 0 and self._sockets is None:
self._wakeup()
def _wakeup(self):
waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(None)
def _start_serving(self):
if self._serving:
return
self._serving = True
for sock in self._sockets:
sock.listen(self._backlog)
self._loop._start_serving(
self._protocol_factory, sock, self._ssl_context,
self, self._backlog, self._ssl_handshake_timeout,
self._ssl_shutdown_timeout)
def get_loop(self):
return self._loop
def is_serving(self):
return self._serving
@property
def sockets(self):
if self._sockets is None:
return ()
return tuple(trsock.TransportSocket(s) for s in self._sockets)
def close(self):
sockets = self._sockets
if sockets is None:
return
self._sockets = None
for sock in sockets:
self._loop._stop_serving(sock)
self._serving = False
if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None
if len(self._clients) == 0:
self._wakeup()
def close_clients(self):
for transport in self._clients.copy():
transport.close()
def abort_clients(self):
for transport in self._clients.copy():
transport.abort()
async def start_serving(self):
self._start_serving()
await tasks.sleep(0)
async def serve_forever(self):
if self._serving_forever_fut is not None:
raise RuntimeError(
f'server {self!r} is already being awaited on serve_forever()')
if self._sockets is None:
raise RuntimeError(f'server {self!r} is closed')
self._start_serving()
self._serving_forever_fut = self._loop.create_future()
try:
await self._serving_forever_fut
except exceptions.CancelledError:
try:
self.close()
await self.wait_closed()
finally:
raise
finally:
self._serving_forever_fut = None
async def wait_closed(self):
if self._waiters is None:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
await waiter
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
self._ready = collections.deque()
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
self._thread_id = None
self._clock_resolution = time.get_clock_info('monotonic').resolution
self._exception_handler = None
self.set_debug(coroutines._is_debug_mode())
self._old_agen_hooks = None
self.slow_callback_duration = 0.1
self._current_handle = None
self._task_factory = None
self._coroutine_origin_tracking_enabled = False
self._coroutine_origin_tracking_saved_depth = None
self._asyncgens = weakref.WeakSet()
self._asyncgens_shutdown_called = False
self._executor_shutdown_called = False
def __repr__(self):
return (
f'<{self.__class__.__name__} running={self.is_running()} '
f'closed={self.is_closed()} debug={self.get_debug()}>'
)
def create_future(self):
return futures.Future(loop=self)
def create_task(self, coro, **kwargs):
self._check_closed()
if self._task_factory is not None:
return self._task_factory(self, coro, **kwargs)
task = tasks.Task(coro, loop=self, **kwargs)
if task._source_traceback:
del task._source_traceback[-1]
try:
return task
finally:
del task
def set_task_factory(self, factory):
if factory is not None and not callable(factory):
raise TypeError('task factory must be a callable or None')
self._task_factory = factory
def get_task_factory(self):
return self._task_factory
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
raise NotImplementedError
def _make_ssl_transport(
self, rawsock, protocol, sslcontext, waiter=None,
*, server_side=False, server_hostname=None,
extra=None, server=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
call_connection_made=True):
raise NotImplementedError
def _make_datagram_transport(self, sock, protocol,
address=None, waiter=None, extra=None):
raise NotImplementedError
def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
extra=None):
raise NotImplementedError
def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
extra=None):
raise NotImplementedError
async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
raise NotImplementedError
def _write_to_self(self):
raise NotImplementedError
def _process_events(self, event_list):
raise NotImplementedError
def _check_closed(self):
if self._closed:
raise RuntimeError('Event loop is closed')
def _check_default_executor(self):
if self._executor_shutdown_called:
raise RuntimeError('Executor shutdown has been called')
def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen)
if not self.is_closed():
self.call_soon_threadsafe(self.create_task, agen.aclose())
def _asyncgen_firstiter_hook(self, agen):
if self._asyncgens_shutdown_called:
warnings.warn(
f"asynchronous generator {agen!r} was scheduled after "
f"loop.shutdown_asyncgens() call",
ResourceWarning, source=self)
self._asyncgens.add(agen)
async def shutdown_asyncgens(self):
self._asyncgens_shutdown_called = True
if not len(self._asyncgens):
return
closing_agens = list(self._asyncgens)
self._asyncgens.clear()
results = await tasks.gather(
*[ag.aclose() for ag in closing_agens],
return_exceptions=True)
for result, agen in zip(results, closing_agens):
if isinstance(result, Exception):
self.call_exception_handler({
'message': f'an error occurred during closing of '
f'asynchronous generator {agen!r}',
'exception': result,
'asyncgen': agen
})
async def shutdown_default_executor(self, timeout=None):
self._executor_shutdown_called = True
if self._default_executor is None:
return
future = self.create_future()
thread = threading.Thread(target=self._do_shutdown, args=(future,))
thread.start()
try:
async with timeouts.timeout(timeout):
await future
except TimeoutError:
warnings.warn("The executor did not finishing joining "
f"its threads within {timeout} seconds.",
RuntimeWarning, stacklevel=2)
self._default_executor.shutdown(wait=False)
else:
thread.join()
def _do_shutdown(self, future):
try:
self._default_executor.shutdown(wait=True)
if not self.is_closed():
self.call_soon_threadsafe(futures._set_result_unless_cancelled,
future, None)
except Exception as ex:
if not self.is_closed() and not future.cancelled():
self.call_soon_threadsafe(future.set_exception, ex)
def _check_running(self):
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
def _run_forever_setup(self):
self._check_closed()
self._check_running()
self._set_coroutine_origin_tracking(self._debug)
self._old_agen_hooks = sys.get_asyncgen_hooks()
self._thread_id = threading.get_ident()
sys.set_asyncgen_hooks(
firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook
)
events._set_running_loop(self)
def _run_forever_cleanup(self):
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
if self._old_agen_hooks is not None:
sys.set_asyncgen_hooks(*self._old_agen_hooks)
self._old_agen_hooks = None
def run_forever(self):
self._run_forever_setup()
try:
while True:
self._run_once()
if self._stopping:
break
finally:
self._run_forever_cleanup()
def run_until_complete(self, future):
self._check_closed()
self._check_running()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()
def stop(self):
self._stopping = True
def close(self):
if self.is_running():
raise RuntimeError("Cannot close a running event loop")
if self._closed:
return
if self._debug:
logger.debug("Close %r", self)
self._closed = True
self._ready.clear()
self._scheduled.clear()
self._executor_shutdown_called = True
executor = self._default_executor
if executor is not None:
self._default_executor = None
executor.shutdown(wait=False)
def is_closed(self):
return self._closed
def __del__(self, _warn=warnings.warn):
if not self.is_closed():
_warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
if not self.is_running():
self.close()
def is_running(self):
return (self._thread_id is not None)
def time(self):
return time.monotonic()
def call_later(self, delay, callback, *args, context=None):
if delay is None:
raise TypeError('delay must not be None')
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer
def call_at(self, when, callback, *args, context=None):
if when is None:
raise TypeError("when cannot be None")
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_at')
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer
def call_soon(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _check_callback(self, callback, method):
if (coroutines.iscoroutine(callback) or
coroutines._iscoroutinefunction(callback)):
raise TypeError(
f"coroutines cannot be used with {method}()")
if not callable(callback):
raise TypeError(
f'a callable object was expected by {method}(), '
f'got {callback!r}')
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle)
return handle
def _check_thread(self):
if self._thread_id is None:
return
thread_id = threading.get_ident()
if thread_id != self._thread_id:
raise RuntimeError(
"Non-thread-safe operation invoked on an event loop other "
"than the current one")
def call_soon_threadsafe(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = events._ThreadSafeHandle(callback, args, self, context)
self._ready.append(handle)
if handle._source_traceback:
del handle._source_traceback[-1]
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self()
return handle
def run_in_executor(self, executor, func, *args):
self._check_closed()
if self._debug:
self._check_callback(func, 'run_in_executor')
if executor is None:
executor = self._default_executor
self._check_default_executor()
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor(
thread_name_prefix='asyncio'
)
self._default_executor = executor
return futures.wrap_future(
executor.submit(func, *args), loop=self)
def set_default_executor(self, executor):
if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
raise TypeError('executor must be ThreadPoolExecutor instance')
self._default_executor = executor
def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
msg = [f"{host}:{port!r}"]
if family:
msg.append(f'family={family!r}')
if type:
msg.append(f'type={type!r}')
if proto:
msg.append(f'proto={proto!r}')
if flags:
msg.append(f'flags={flags!r}')
msg = ', '.join(msg)
logger.debug('Get address info %s', msg)
t0 = self.time()
addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
dt = self.time() - t0
msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
if dt >= self.slow_callback_duration:
logger.info(msg)
else:
logger.debug(msg)
return addrinfo
async def getaddrinfo(self, host, port, *,
family=0, type=0, proto=0, flags=0):
if self._debug:
getaddr_func = self._getaddrinfo_debug
else:
getaddr_func = socket.getaddrinfo
return await self.run_in_executor(
None, getaddr_func, host, port, family, type, proto, flags)
async def getnameinfo(self, sockaddr, flags=0):
return await self.run_in_executor(
None, socket.getnameinfo, sockaddr, flags)
async def sock_sendfile(self, sock, file, offset=0, count=None,
*, fallback=True):
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
_check_ssl_socket(sock)
self._check_sendfile_params(sock, file, offset, count)
try:
return await self._sock_sendfile_native(sock, file,
offset, count)
except exceptions.SendfileNotAvailableError as exc:
if not fallback:
raise
return await self._sock_sendfile_fallback(sock, file,
offset, count)
async def _sock_sendfile_native(self, sock, file, offset, count):
raise exceptions.SendfileNotAvailableError(
f"syscall sendfile is not available for socket {sock!r} "
f"and file {file!r} combination")
async def _sock_sendfile_fallback(self, sock, file, offset, count):
if offset:
file.seek(offset)
blocksize = (
min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
)
buf = bytearray(blocksize)
total_sent = 0
try:
while True:
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
break
view = memoryview(buf)[:blocksize]
read = await self.run_in_executor(None, file.readinto, view)
if not read:
break await self.sock_sendall(sock, view[:read])
total_sent += read
return total_sent
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset + total_sent)
def _check_sendfile_params(self, sock, file, offset, count):
if 'b' not in getattr(file, 'mode', 'b'):
raise ValueError("file should be opened in binary mode")
if not sock.type == socket.SOCK_STREAM:
raise ValueError("only SOCK_STREAM type sockets are supported")
if count is not None:
if not isinstance(count, int):
raise TypeError(
"count must be a positive integer (got {!r})".format(count))
if count <= 0:
raise ValueError(
"count must be a positive integer (got {!r})".format(count))
if not isinstance(offset, int):
raise TypeError(
"offset must be a non-negative integer (got {!r})".format(
offset))
if offset < 0:
raise ValueError(
"offset must be a non-negative integer (got {!r})".format(
offset))
async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
my_exceptions = []
exceptions.append(my_exceptions)
family, type_, proto, _, address = addr_info
sock = None
try:
try:
sock = socket.socket(family=family, type=type_, proto=proto)
sock.setblocking(False)
if local_addr_infos is not None:
for lfamily, _, _, _, laddr in local_addr_infos:
if lfamily != family:
continue
try:
sock.bind(laddr)
break
except OSError as exc:
msg = (
f'error while attempting to bind on '
f'address {laddr!r}: {str(exc).lower()}'
)
exc = OSError(exc.errno, msg)
my_exceptions.append(exc)
else: if my_exceptions:
raise my_exceptions.pop()
else:
raise OSError(f"no matching local address with {family=} found")
await self.sock_connect(sock, address)
return sock
except OSError as exc:
my_exceptions.append(exc)
raise
except:
if sock is not None:
try:
sock.close()
except OSError:
pass
raise
finally:
exceptions = my_exceptions = None
async def create_connection(
self, protocol_factory, host=None, port=None,
*, ssl=None, family=0,
proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
happy_eyeballs_delay=None, interleave=None,
all_errors=False):
if server_hostname is not None and not ssl:
raise ValueError('server_hostname is only meaningful with ssl')
if server_hostname is None and ssl:
if not host:
raise ValueError('You must set server_hostname '
'when using ssl without a host')
server_hostname = host
if ssl_handshake_timeout is not None and not ssl:
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')
if ssl_shutdown_timeout is not None and not ssl:
raise ValueError(
'ssl_shutdown_timeout is only meaningful with ssl')
if sock is not None:
_check_ssl_socket(sock)
if happy_eyeballs_delay is not None and interleave is None:
interleave = 1
if host is not None or port is not None:
if sock is not None:
raise ValueError(
'host/port and sock can not be specified at the same time')
infos = await self._ensure_resolved(
(host, port), family=family,
type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
if not infos:
raise OSError('getaddrinfo() returned empty list')
if local_addr is not None:
laddr_infos = await self._ensure_resolved(
local_addr, family=family,
type=socket.SOCK_STREAM, proto=proto,
flags=flags, loop=self)
if not laddr_infos:
raise OSError('getaddrinfo() returned empty list')
else:
laddr_infos = None
if interleave:
infos = _interleave_addrinfos(infos, interleave)
exceptions = []
if happy_eyeballs_delay is None:
for addrinfo in infos:
try:
sock = await self._connect_sock(
exceptions, addrinfo, laddr_infos)
break
except OSError:
continue
else: sock = (await staggered.staggered_race(
(
lambda addrinfo=addrinfo: self._connect_sock(
exceptions, addrinfo, laddr_infos
)
for addrinfo in infos
),
happy_eyeballs_delay,
loop=self,
))[0]
if sock is None:
exceptions = [exc for sub in exceptions for exc in sub]
try:
if all_errors:
raise ExceptionGroup("create_connection failed", exceptions)
if len(exceptions) == 1:
raise exceptions[0]
elif exceptions:
model = str(exceptions[0])
if all(str(exc) == model for exc in exceptions):
raise exceptions[0]
raise OSError('Multiple exceptions: {}'.format(
', '.join(str(exc) for exc in exceptions)))
else:
raise TimeoutError('create_connection failed')
finally:
exceptions = None
else:
if sock is None:
raise ValueError(
'host and port was not specified and no sock specified')
if sock.type != socket.SOCK_STREAM:
raise ValueError(
f'A Stream Socket was expected, got {sock!r}')
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname,
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)
if self._debug:
sock = transport.get_extra_info('socket')
logger.debug("%r connected to %s:%r: (%r, %r)",
sock, host, port, transport, protocol)
return transport, protocol
async def _create_connection_transport(
self, sock, protocol_factory, ssl,
server_hostname, server_side=False,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None):
sock.setblocking(False)
protocol = protocol_factory()
waiter = self.create_future()
if ssl:
sslcontext = None if isinstance(ssl, bool) else ssl
transport = self._make_ssl_transport(
sock, protocol, sslcontext, waiter,
server_side=server_side, server_hostname=server_hostname,
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)
else:
transport = self._make_socket_transport(sock, protocol, waiter)
try:
await waiter
except:
transport.close()
raise
return transport, protocol
async def sendfile(self, transport, file, offset=0, count=None,
*, fallback=True):
if transport.is_closing():
raise RuntimeError("Transport is closing")
mode = getattr(transport, '_sendfile_compatible',
constants._SendfileMode.UNSUPPORTED)
if mode is constants._SendfileMode.UNSUPPORTED:
raise RuntimeError(
f"sendfile is not supported for transport {transport!r}")
if mode is constants._SendfileMode.TRY_NATIVE:
try:
return await self._sendfile_native(transport, file,
offset, count)
except exceptions.SendfileNotAvailableError as exc:
if not fallback:
raise
if not fallback:
raise RuntimeError(
f"fallback is disabled and native sendfile is not "
f"supported for transport {transport!r}")
return await self._sendfile_fallback(transport, file,
offset, count)
async def _sendfile_native(self, transp, file, offset, count):
raise exceptions.SendfileNotAvailableError(
"sendfile syscall is not supported")
async def _sendfile_fallback(self, transp, file, offset, count):
if offset:
file.seek(offset)
blocksize = min(count, 16384) if count else 16384
buf = bytearray(blocksize)
total_sent = 0
proto = _SendfileFallbackProtocol(transp)
try:
while True:
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
return total_sent
view = memoryview(buf)[:blocksize]
read = await self.run_in_executor(None, file.readinto, view)
if not read:
return total_sent transp.write(view[:read])
await proto.drain()
total_sent += read
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset + total_sent)
await proto.restore()
async def start_tls(self, transport, protocol, sslcontext, *,
server_side=False,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None):
if ssl is None:
raise RuntimeError('Python ssl module is not available')
if not isinstance(sslcontext, ssl.SSLContext):
raise TypeError(
f'sslcontext is expected to be an instance of ssl.SSLContext, '
f'got {sslcontext!r}')
if not getattr(transport, '_start_tls_compatible', False):
raise TypeError(
f'transport {transport!r} is not supported by start_tls()')
waiter = self.create_future()
ssl_protocol = sslproto.SSLProtocol(
self, protocol, sslcontext, waiter,
server_side, server_hostname,
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout,
call_connection_made=False)
transport.pause_reading()
transport.set_protocol(ssl_protocol)
conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
resume_cb = self.call_soon(transport.resume_reading)
try:
await waiter
except BaseException:
transport.close()
conmade_cb.cancel()
resume_cb.cancel()
raise
return ssl_protocol._app_transport
async def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
reuse_port=None,
allow_broadcast=None, sock=None):
if sock is not None:
if sock.type == socket.SOCK_STREAM:
raise ValueError(
f'A datagram socket was expected, got {sock!r}')
if (local_addr or remote_addr or
family or proto or flags or
reuse_port or allow_broadcast):
opts = dict(local_addr=local_addr, remote_addr=remote_addr,
family=family, proto=proto, flags=flags,
reuse_port=reuse_port,
allow_broadcast=allow_broadcast)
problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
raise ValueError(
f'socket modifier keyword arguments can not be used '
f'when sock is specified. ({problems})')
sock.setblocking(False)
r_addr = None
else:
if not (local_addr or remote_addr):
if family == 0:
raise ValueError('unexpected address family')
addr_pairs_info = (((family, proto), (None, None)),)
elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
for addr in (local_addr, remote_addr):
if addr is not None and not isinstance(addr, str):
raise TypeError('string is expected')
if local_addr and local_addr[0] not in (0, '\x00'):
try:
if stat.S_ISSOCK(os.stat(local_addr).st_mode):
os.remove(local_addr)
except FileNotFoundError:
pass
except OSError as err:
logger.error('Unable to check or remove stale UNIX '
'socket %r: %r',
local_addr, err)
addr_pairs_info = (((family, proto),
(local_addr, remote_addr)), )
else:
addr_infos = {} for idx, addr in ((0, local_addr), (1, remote_addr)):
if addr is not None:
if not (isinstance(addr, tuple) and len(addr) == 2):
raise TypeError('2-tuple is expected')
infos = await self._ensure_resolved(
addr, family=family, type=socket.SOCK_DGRAM,
proto=proto, flags=flags, loop=self)
if not infos:
raise OSError('getaddrinfo() returned empty list')
for fam, _, pro, _, address in infos:
key = (fam, pro)
if key not in addr_infos:
addr_infos[key] = [None, None]
addr_infos[key][idx] = address
addr_pairs_info = [
(key, addr_pair) for key, addr_pair in addr_infos.items()
if not ((local_addr and addr_pair[0] is None) or
(remote_addr and addr_pair[1] is None))]
if not addr_pairs_info:
raise ValueError('can not get address information')
exceptions = []
for ((family, proto),
(local_address, remote_address)) in addr_pairs_info:
sock = None
r_addr = None
try:
sock = socket.socket(
family=family, type=socket.SOCK_DGRAM, proto=proto)
if reuse_port:
_set_reuseport(sock)
if allow_broadcast:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setblocking(False)
if local_addr:
sock.bind(local_address)
if remote_addr:
if not allow_broadcast:
await self.sock_connect(sock, remote_address)
r_addr = remote_address
except OSError as exc:
if sock is not None:
sock.close()
exceptions.append(exc)
except:
if sock is not None:
sock.close()
raise
else:
break
else:
raise exceptions[0]
protocol = protocol_factory()
waiter = self.create_future()
transport = self._make_datagram_transport(
sock, protocol, r_addr, waiter)
if self._debug:
if local_addr:
logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
"created: (%r, %r)",
local_addr, remote_addr, transport, protocol)
else:
logger.debug("Datagram endpoint remote_addr=%r created: "
"(%r, %r)",
remote_addr, transport, protocol)
try:
await waiter
except:
transport.close()
raise
return transport, protocol
async def _ensure_resolved(self, address, *,
family=0, type=socket.SOCK_STREAM,
proto=0, flags=0, loop):
host, port = address[:2]
info = _ipaddr_info(host, port, family, type, proto, *address[2:])
if info is not None:
return [info]
else:
return await loop.getaddrinfo(host, port, family=family, type=type,
proto=proto, flags=flags)
async def _create_server_getaddrinfo(self, host, port, family, flags):
infos = await self._ensure_resolved((host, port), family=family,
type=socket.SOCK_STREAM,
flags=flags, loop=self)
if not infos:
raise OSError(f'getaddrinfo({host!r}) returned empty list')
return infos
async def create_server(
self, protocol_factory, host=None, port=None,
*,
family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE,
sock=None,
backlog=100,
ssl=None,
reuse_address=None,
reuse_port=None,
keep_alive=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True):
if isinstance(ssl, bool):
raise TypeError('ssl argument must be an SSLContext or None')
if ssl_handshake_timeout is not None and ssl is None:
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')
if ssl_shutdown_timeout is not None and ssl is None:
raise ValueError(
'ssl_shutdown_timeout is only meaningful with ssl')
if sock is not None:
_check_ssl_socket(sock)
if host is not None or port is not None:
if sock is not None:
raise ValueError(
'host/port and sock can not be specified at the same time')
if reuse_address is None:
reuse_address = os.name == "posix" and sys.platform != "cygwin"
sockets = []
if host == '':
hosts = [None]
elif (isinstance(host, str) or
not isinstance(host, collections.abc.Iterable)):
hosts = [host]
else:
hosts = host
fs = [self._create_server_getaddrinfo(host, port, family=family,
flags=flags)
for host in hosts]
infos = await tasks.gather(*fs)
infos = set(itertools.chain.from_iterable(infos))
completed = False
try:
for res in infos:
af, socktype, proto, canonname, sa = res
try:
sock = socket.socket(af, socktype, proto)
except socket.error:
if self._debug:
logger.warning('create_server() failed to create '
'socket.socket(%r, %r, %r)',
af, socktype, proto, exc_info=True)
continue
sockets.append(sock)
if reuse_address:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
if reuse_port and af in (socket.AF_INET, socket.AF_INET6):
_set_reuseport(sock)
if keep_alive:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
if (_HAS_IPv6 and
af == socket.AF_INET6 and
hasattr(socket, 'IPPROTO_IPV6')):
sock.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_V6ONLY,
True)
try:
sock.bind(sa)
except OSError as err:
msg = ('error while attempting '
'to bind on address %r: %s'
% (sa, str(err).lower()))
if err.errno == errno.EADDRNOTAVAIL:
sockets.pop()
sock.close()
if self._debug:
logger.warning(msg)
continue
raise OSError(err.errno, msg) from None
if not sockets:
raise OSError('could not bind on any address out of %r'
% ([info[4] for info in infos],))
completed = True
finally:
if not completed:
for sock in sockets:
sock.close()
else:
if sock is None:
raise ValueError('Neither host/port nor sock were specified')
if sock.type != socket.SOCK_STREAM:
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
sockets = [sock]
for sock in sockets:
sock.setblocking(False)
server = Server(self, sockets, protocol_factory,
ssl, backlog, ssl_handshake_timeout,
ssl_shutdown_timeout)
if start_serving:
server._start_serving()
await tasks.sleep(0)
if self._debug:
logger.info("%r is serving", server)
return server
async def connect_accepted_socket(
self, protocol_factory, sock,
*, ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None):
if sock.type != socket.SOCK_STREAM:
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
if ssl_handshake_timeout is not None and not ssl:
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')
if ssl_shutdown_timeout is not None and not ssl:
raise ValueError(
'ssl_shutdown_timeout is only meaningful with ssl')
_check_ssl_socket(sock)
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, '', server_side=True,
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)
if self._debug:
sock = transport.get_extra_info('socket')
logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
return transport, protocol
async def connect_read_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = self.create_future()
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
try:
await waiter
except:
transport.close()
raise
if self._debug:
logger.debug('Read pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol)
return transport, protocol
async def connect_write_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = self.create_future()
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
try:
await waiter
except:
transport.close()
raise
if self._debug:
logger.debug('Write pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol)
return transport, protocol
def _log_subprocess(self, msg, stdin, stdout, stderr):
info = [msg]
if stdin is not None:
info.append(f'stdin={_format_pipe(stdin)}')
if stdout is not None and stderr == subprocess.STDOUT:
info.append(f'stdout=stderr={_format_pipe(stdout)}')
else:
if stdout is not None:
info.append(f'stdout={_format_pipe(stdout)}')
if stderr is not None:
info.append(f'stderr={_format_pipe(stderr)}')
logger.debug(' '.join(info))
async def subprocess_shell(self, protocol_factory, cmd, *,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False,
shell=True, bufsize=0,
encoding=None, errors=None, text=None,
**kwargs):
if not isinstance(cmd, (bytes, str)):
raise ValueError("cmd must be a string")
if universal_newlines:
raise ValueError("universal_newlines must be False")
if not shell:
raise ValueError("shell must be True")
if bufsize != 0:
raise ValueError("bufsize must be 0")
if text:
raise ValueError("text must be False")
if encoding is not None:
raise ValueError("encoding must be None")
if errors is not None:
raise ValueError("errors must be None")
protocol = protocol_factory()
debug_log = None
if self._debug:
debug_log = 'run shell command %r' % cmd
self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = await self._make_subprocess_transport(
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
if self._debug and debug_log is not None:
logger.info('%s: %r', debug_log, transport)
return transport, protocol
async def subprocess_exec(self, protocol_factory, program, *args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=False,
shell=False, bufsize=0,
encoding=None, errors=None, text=None,
**kwargs):
if universal_newlines:
raise ValueError("universal_newlines must be False")
if shell:
raise ValueError("shell must be False")
if bufsize != 0:
raise ValueError("bufsize must be 0")
if text:
raise ValueError("text must be False")
if encoding is not None:
raise ValueError("encoding must be None")
if errors is not None:
raise ValueError("errors must be None")
popen_args = (program,) + args
protocol = protocol_factory()
debug_log = None
if self._debug:
debug_log = f'execute program {program!r}'
self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = await self._make_subprocess_transport(
protocol, popen_args, False, stdin, stdout, stderr,
bufsize, **kwargs)
if self._debug and debug_log is not None:
logger.info('%s: %r', debug_log, transport)
return transport, protocol
def get_exception_handler(self):
return self._exception_handler
def set_exception_handler(self, handler):
if handler is not None and not callable(handler):
raise TypeError(f'A callable object or None is expected, '
f'got {handler!r}')
self._exception_handler = handler
def default_exception_handler(self, context):
message = context.get('message')
if not message:
message = 'Unhandled exception in event loop'
exception = context.get('exception')
if exception is not None:
exc_info = (type(exception), exception, exception.__traceback__)
else:
exc_info = False
if ('source_traceback' not in context and
self._current_handle is not None and
self._current_handle._source_traceback):
context['handle_traceback'] = \
self._current_handle._source_traceback
log_lines = [message]
for key in sorted(context):
if key in {'message', 'exception'}:
continue
value = context[key]
if key == 'source_traceback':
tb = ''.join(traceback.format_list(value))
value = 'Object created at (most recent call last):\n'
value += tb.rstrip()
elif key == 'handle_traceback':
tb = ''.join(traceback.format_list(value))
value = 'Handle created at (most recent call last):\n'
value += tb.rstrip()
else:
value = repr(value)
log_lines.append(f'{key}: {value}')
logger.error('\n'.join(log_lines), exc_info=exc_info)
def call_exception_handler(self, context):
if self._exception_handler is None:
try:
self.default_exception_handler(context)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
logger.error('Exception in default exception handler',
exc_info=True)
else:
try:
ctx = None
thing = context.get("task")
if thing is None:
thing = context.get("future")
if thing is None:
thing = context.get("handle")
if thing is not None and hasattr(thing, "get_context"):
ctx = thing.get_context()
if ctx is not None and hasattr(ctx, "run"):
ctx.run(self._exception_handler, self, context)
else:
self._exception_handler(self, context)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
try:
self.default_exception_handler({
'message': 'Unhandled error in exception handler',
'exception': exc,
'context': context,
})
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
logger.error('Exception in default exception handler '
'while handling an unexpected error '
'in custom exception handler',
exc_info=True)
def _add_callback(self, handle):
if not handle._cancelled:
self._ready.append(handle)
def _add_callback_signalsafe(self, handle):
self._add_callback(handle)
self._write_to_self()
def _timer_handle_cancelled(self, handle):
if handle._scheduled:
self._timer_cancelled_count += 1
def _run_once(self):
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
timeout = self._scheduled[0]._when - self.time()
if timeout > MAXIMUM_SELECT_TIMEOUT:
timeout = MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
timeout = 0
event_list = self._selector.select(timeout)
self._process_events(event_list)
event_list = None
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None
def _set_coroutine_origin_tracking(self, enabled):
if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
return
if enabled:
self._coroutine_origin_tracking_saved_depth = (
sys.get_coroutine_origin_tracking_depth())
sys.set_coroutine_origin_tracking_depth(
constants.DEBUG_STACK_DEPTH)
else:
sys.set_coroutine_origin_tracking_depth(
self._coroutine_origin_tracking_saved_depth)
self._coroutine_origin_tracking_enabled = enabled
def get_debug(self):
return self._debug
def set_debug(self, enabled):
self._debug = enabled
if self.is_running():
self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)