import _socket
from _socket import *
import io
import os
import sys
from enum import IntEnum, IntFlag
try:
import errno
except ImportError:
errno = None
EBADF = getattr(errno, 'EBADF', 9)
EAGAIN = getattr(errno, 'EAGAIN', 11)
EWOULDBLOCK = getattr(errno, 'EWOULDBLOCK', 11)
__all__ = ["fromfd", "getfqdn", "create_connection", "create_server",
"has_dualstack_ipv6", "AddressFamily", "SocketKind"]
__all__.extend(os._get_exports_list(_socket))
IntEnum._convert_(
'AddressFamily',
__name__,
lambda C: C.isupper() and C.startswith('AF_'))
IntEnum._convert_(
'SocketKind',
__name__,
lambda C: C.isupper() and C.startswith('SOCK_'))
IntFlag._convert_(
'MsgFlag',
__name__,
lambda C: C.isupper() and C.startswith('MSG_'))
IntFlag._convert_(
'AddressInfo',
__name__,
lambda C: C.isupper() and C.startswith('AI_'))
_LOCALHOST = '127.0.0.1'
_LOCALHOST_V6 = '::1'
def _intenum_converter(value, enum_klass):
try:
return enum_klass(value)
except ValueError:
return value
if sys.platform.lower().startswith("win"):
errorTab = {
6: "Specified event object handle is invalid.",
8: "Insufficient memory available.",
87: "One or more parameters are invalid.",
995: "Overlapped operation aborted.",
996: "Overlapped I/O event object not in signaled state.",
997: "Overlapped operation will complete later.",
10004: "The operation was interrupted.",
10009: "A bad file handle was passed.",
10013: "Permission denied.",
10014: "A fault occurred on the network??",
10022: "An invalid operation was attempted.",
10024: "Too many open files.",
10035: "The socket operation would block.",
10036: "A blocking operation is already in progress.",
10037: "Operation already in progress.",
10038: "Socket operation on nonsocket.",
10039: "Destination address required.",
10040: "Message too long.",
10041: "Protocol wrong type for socket.",
10042: "Bad protocol option.",
10043: "Protocol not supported.",
10044: "Socket type not supported.",
10045: "Operation not supported.",
10046: "Protocol family not supported.",
10047: "Address family not supported by protocol family.",
10048: "The network address is in use.",
10049: "Cannot assign requested address.",
10050: "Network is down.",
10051: "Network is unreachable.",
10052: "Network dropped connection on reset.",
10053: "Software caused connection abort.",
10054: "The connection has been reset.",
10055: "No buffer space available.",
10056: "Socket is already connected.",
10057: "Socket is not connected.",
10058: "The network has been shut down.",
10059: "Too many references.",
10060: "The operation timed out.",
10061: "Connection refused.",
10062: "Cannot translate name.",
10063: "The name is too long.",
10064: "The host is down.",
10065: "The host is unreachable.",
10066: "Directory not empty.",
10067: "Too many processes.",
10068: "User quota exceeded.",
10069: "Disk quota exceeded.",
10070: "Stale file handle reference.",
10071: "Item is remote.",
10091: "Network subsystem is unavailable.",
10092: "Winsock.dll version out of range.",
10093: "Successful WSAStartup not yet performed.",
10101: "Graceful shutdown in progress.",
10102: "No more results from WSALookupServiceNext.",
10103: "Call has been canceled.",
10104: "Procedure call table is invalid.",
10105: "Service provider is invalid.",
10106: "Service provider failed to initialize.",
10107: "System call failure.",
10108: "Service not found.",
10109: "Class type not found.",
10110: "No more results from WSALookupServiceNext.",
10111: "Call was canceled.",
10112: "Database query was refused.",
11001: "Host not found.",
11002: "Nonauthoritative host not found.",
11003: "This is a nonrecoverable error.",
11004: "Valid name, no data record requested type.",
11005: "QoS receivers.",
11006: "QoS senders.",
11007: "No QoS senders.",
11008: "QoS no receivers.",
11009: "QoS request confirmed.",
11010: "QoS admission error.",
11011: "QoS policy failure.",
11012: "QoS bad style.",
11013: "QoS bad object.",
11014: "QoS traffic control error.",
11015: "QoS generic error.",
11016: "QoS service type error.",
11017: "QoS flowspec error.",
11018: "Invalid QoS provider buffer.",
11019: "Invalid QoS filter style.",
11020: "Invalid QoS filter style.",
11021: "Incorrect QoS filter count.",
11022: "Invalid QoS object length.",
11023: "Incorrect QoS flow count.",
11024: "Unrecognized QoS object.",
11025: "Invalid QoS policy object.",
11026: "Invalid QoS flow descriptor.",
11027: "Invalid QoS provider-specific flowspec.",
11028: "Invalid QoS provider-specific filterspec.",
11029: "Invalid QoS shape discard mode object.",
11030: "Invalid QoS shaping rate object.",
11031: "Reserved policy QoS element type."
}
__all__.append("errorTab")
class _GiveupOnSendfile(Exception): pass
class socket(_socket.socket):
__slots__ = ["__weakref__", "_io_refs", "_closed"]
def __init__(self, family=-1, type=-1, proto=-1, fileno=None):
if fileno is None:
if family == -1:
family = AF_INET
if type == -1:
type = SOCK_STREAM
if proto == -1:
proto = 0
_socket.socket.__init__(self, family, type, proto, fileno)
self._io_refs = 0
self._closed = False
def __enter__(self):
return self
def __exit__(self, *args):
if not self._closed:
self.close()
def __repr__(self):
closed = getattr(self, '_closed', False)
s = "<%s.%s%s fd=%i, family=%s, type=%s, proto=%i" \
% (self.__class__.__module__,
self.__class__.__qualname__,
" [closed]" if closed else "",
self.fileno(),
self.family,
self.type,
self.proto)
if not closed:
try:
laddr = self.getsockname()
if laddr:
s += ", laddr=%s" % str(laddr)
except (error, AttributeError):
pass
try:
raddr = self.getpeername()
if raddr:
s += ", raddr=%s" % str(raddr)
except (error, AttributeError):
pass
s += '>'
return s
def __getstate__(self):
raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
def dup(self):
fd = dup(self.fileno())
sock = self.__class__(self.family, self.type, self.proto, fileno=fd)
sock.settimeout(self.gettimeout())
return sock
def accept(self):
fd, addr = self._accept()
sock = socket(self.family, self.type, self.proto, fileno=fd)
if getdefaulttimeout() is None and self.gettimeout():
sock.setblocking(True)
return sock, addr
def makefile(self, mode="r", buffering=None, *,
encoding=None, errors=None, newline=None):
if not set(mode) <= {"r", "w", "b"}:
raise ValueError("invalid mode %r (only r, w, b allowed)" % (mode,))
writing = "w" in mode
reading = "r" in mode or not writing
assert reading or writing
binary = "b" in mode
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
raw = SocketIO(self, rawmode)
self._io_refs += 1
if buffering is None:
buffering = -1
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
assert writing
buffer = io.BufferedWriter(raw, buffering)
if binary:
return buffer
encoding = io.text_encoding(encoding)
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode
return text
if hasattr(os, 'sendfile'):
def _sendfile_use_sendfile(self, file, offset=0, count=None):
import selectors
self._check_sendfile_params(file, offset, count)
sockno = self.fileno()
try:
fileno = file.fileno()
except (AttributeError, io.UnsupportedOperation) as err:
raise _GiveupOnSendfile(err) try:
fsize = os.fstat(fileno).st_size
except OSError as err:
raise _GiveupOnSendfile(err) if not fsize:
return 0 blocksize = min(count or fsize, 2 ** 30)
timeout = self.gettimeout()
if timeout == 0:
raise ValueError("non-blocking sockets are not supported")
if hasattr(selectors, 'PollSelector'):
selector = selectors.PollSelector()
else:
selector = selectors.SelectSelector()
selector.register(sockno, selectors.EVENT_WRITE)
total_sent = 0
selector_select = selector.select
os_sendfile = os.sendfile
try:
while True:
if timeout and not selector_select(timeout):
raise TimeoutError('timed out')
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
break
try:
sent = os_sendfile(sockno, fileno, offset, blocksize)
except BlockingIOError:
if not timeout:
selector_select()
continue
except OSError as err:
if total_sent == 0:
raise _GiveupOnSendfile(err)
raise err from None
else:
if sent == 0:
break offset += sent
total_sent += sent
return total_sent
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset)
else:
def _sendfile_use_sendfile(self, file, offset=0, count=None):
raise _GiveupOnSendfile(
"os.sendfile() not available on this platform")
def _sendfile_use_send(self, file, offset=0, count=None):
self._check_sendfile_params(file, offset, count)
if self.gettimeout() == 0:
raise ValueError("non-blocking sockets are not supported")
if offset:
file.seek(offset)
blocksize = min(count, 8192) if count else 8192
total_sent = 0
file_read = file.read
sock_send = self.send
try:
while True:
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
break
data = memoryview(file_read(blocksize))
if not data:
break while True:
try:
sent = sock_send(data)
except BlockingIOError:
continue
else:
total_sent += sent
if sent < len(data):
data = data[sent:]
else:
break
return total_sent
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset + total_sent)
def _check_sendfile_params(self, file, offset, count):
if 'b' not in getattr(file, 'mode', 'b'):
raise ValueError("file should be opened in binary mode")
if not self.type & SOCK_STREAM:
raise ValueError("only SOCK_STREAM type sockets are supported")
if count is not None:
if not isinstance(count, int):
raise TypeError(
"count must be a positive integer (got {!r})".format(count))
if count <= 0:
raise ValueError(
"count must be a positive integer (got {!r})".format(count))
def sendfile(self, file, offset=0, count=None):
try:
return self._sendfile_use_sendfile(file, offset, count)
except _GiveupOnSendfile:
return self._sendfile_use_send(file, offset, count)
def _decref_socketios(self):
if self._io_refs > 0:
self._io_refs -= 1
if self._closed:
self.close()
def _real_close(self, _ss=_socket.socket):
_ss.close(self)
def close(self):
self._closed = True
if self._io_refs <= 0:
self._real_close()
def detach(self):
self._closed = True
return super().detach()
@property
def family(self):
return _intenum_converter(super().family, AddressFamily)
@property
def type(self):
return _intenum_converter(super().type, SocketKind)
if os.name == 'nt':
def get_inheritable(self):
return os.get_handle_inheritable(self.fileno())
def set_inheritable(self, inheritable):
os.set_handle_inheritable(self.fileno(), inheritable)
else:
def get_inheritable(self):
return os.get_inheritable(self.fileno())
def set_inheritable(self, inheritable):
os.set_inheritable(self.fileno(), inheritable)
get_inheritable.__doc__ = "Get the inheritable flag of the socket"
set_inheritable.__doc__ = "Set the inheritable flag of the socket"
def fromfd(fd, family, type, proto=0):
nfd = dup(fd)
return socket(family, type, proto, nfd)
if hasattr(_socket.socket, "sendmsg"):
def send_fds(sock, buffers, fds, flags=0, address=None):
import array
return sock.sendmsg(buffers, [(_socket.SOL_SOCKET,
_socket.SCM_RIGHTS, array.array("i", fds))])
__all__.append("send_fds")
if hasattr(_socket.socket, "recvmsg"):
def recv_fds(sock, bufsize, maxfds, flags=0):
import array
fds = array.array("i")
msg, ancdata, flags, addr = sock.recvmsg(bufsize,
_socket.CMSG_LEN(maxfds * fds.itemsize))
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if (cmsg_level == _socket.SOL_SOCKET and cmsg_type == _socket.SCM_RIGHTS):
fds.frombytes(cmsg_data[:
len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
return msg, list(fds), flags, addr
__all__.append("recv_fds")
if hasattr(_socket.socket, "share"):
def fromshare(info):
return socket(0, 0, 0, info)
__all__.append("fromshare")
def _fallback_socketpair(family=AF_INET, type=SOCK_STREAM, proto=0):
if family == AF_INET:
host = _LOCALHOST
elif family == AF_INET6:
host = _LOCALHOST_V6
else:
raise ValueError("Only AF_INET and AF_INET6 socket address families "
"are supported")
if type != SOCK_STREAM:
raise ValueError("Only SOCK_STREAM socket type is supported")
if proto != 0:
raise ValueError("Only protocol zero is supported")
lsock = socket(family, type, proto)
try:
lsock.bind((host, 0))
lsock.listen()
addr, port = lsock.getsockname()[:2]
csock = socket(family, type, proto)
try:
csock.setblocking(False)
try:
csock.connect((addr, port))
except (BlockingIOError, InterruptedError):
pass
csock.setblocking(True)
ssock, _ = lsock.accept()
except:
csock.close()
raise
finally:
lsock.close()
try:
if (
ssock.getsockname() != csock.getpeername()
or csock.getsockname() != ssock.getpeername()
):
raise ConnectionError("Unexpected peer connection")
except:
ssock.close()
csock.close()
raise
return (ssock, csock)
if hasattr(_socket, "socketpair"):
def socketpair(family=None, type=SOCK_STREAM, proto=0):
if family is None:
try:
family = AF_UNIX
except NameError:
family = AF_INET
a, b = _socket.socketpair(family, type, proto)
a = socket(family, type, proto, a.detach())
b = socket(family, type, proto, b.detach())
return a, b
else:
socketpair = _fallback_socketpair
__all__.append("socketpair")
socketpair.__doc__ = """socketpair([family[, type[, proto]]]) -> (socket object, socket object)
Create a pair of socket objects from the sockets returned by the platform
socketpair() function.
The arguments are the same as for socket() except the default family is AF_UNIX
if defined on the platform; otherwise, the default is AF_INET.
"""
_blocking_errnos = { EAGAIN, EWOULDBLOCK }
class SocketIO(io.RawIOBase):
def __init__(self, sock, mode):
if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
raise ValueError("invalid mode: %r" % mode)
io.RawIOBase.__init__(self)
self._sock = sock
if "b" not in mode:
mode += "b"
self._mode = mode
self._reading = "r" in mode
self._writing = "w" in mode
self._timeout_occurred = False
def readinto(self, b):
self._checkClosed()
self._checkReadable()
if self._timeout_occurred:
raise OSError("cannot read from timed out object")
try:
return self._sock.recv_into(b)
except timeout:
self._timeout_occurred = True
raise
except error as e:
if e.errno in _blocking_errnos:
return None
raise
def write(self, b):
self._checkClosed()
self._checkWritable()
try:
return self._sock.send(b)
except error as e:
if e.errno in _blocking_errnos:
return None
raise
def readable(self):
if self.closed:
raise ValueError("I/O operation on closed socket.")
return self._reading
def writable(self):
if self.closed:
raise ValueError("I/O operation on closed socket.")
return self._writing
def seekable(self):
if self.closed:
raise ValueError("I/O operation on closed socket.")
return super().seekable()
def fileno(self):
self._checkClosed()
return self._sock.fileno()
@property
def name(self):
if not self.closed:
return self.fileno()
else:
return -1
@property
def mode(self):
return self._mode
def close(self):
if self.closed:
return
io.RawIOBase.close(self)
self._sock._decref_socketios()
self._sock = None
def getfqdn(name=''):
name = name.strip()
if not name or name in ('0.0.0.0', '::'):
name = gethostname()
try:
hostname, aliases, ipaddrs = gethostbyaddr(name)
except error:
pass
else:
aliases.insert(0, hostname)
for name in aliases:
if '.' in name:
break
else:
name = hostname
return name
_GLOBAL_DEFAULT_TIMEOUT = object()
def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT,
source_address=None, *, all_errors=False):
host, port = address
exceptions = []
for res in getaddrinfo(host, port, 0, SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket(af, socktype, proto)
if timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
exceptions.clear()
return sock
except error as exc:
if not all_errors:
exceptions.clear() exceptions.append(exc)
if sock is not None:
sock.close()
if len(exceptions):
try:
if not all_errors:
raise exceptions[0]
raise ExceptionGroup("create_connection failed", exceptions)
finally:
exceptions.clear()
else:
raise error("getaddrinfo returns an empty list")
def has_dualstack_ipv6():
if not has_ipv6 \
or not hasattr(_socket, 'IPPROTO_IPV6') \
or not hasattr(_socket, 'IPV6_V6ONLY'):
return False
try:
with socket(AF_INET6, SOCK_STREAM) as sock:
sock.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 0)
return True
except error:
return False
def create_server(address, *, family=AF_INET, backlog=None, reuse_port=False,
dualstack_ipv6=False):
if reuse_port and not hasattr(_socket, "SO_REUSEPORT"):
raise ValueError("SO_REUSEPORT not supported on this platform")
if dualstack_ipv6:
if not has_dualstack_ipv6():
raise ValueError("dualstack_ipv6 not supported on this platform")
if family != AF_INET6:
raise ValueError("dualstack_ipv6 requires AF_INET6 family")
sock = socket(family, SOCK_STREAM)
try:
if os.name not in ('nt', 'cygwin') and \
hasattr(_socket, 'SO_REUSEADDR'):
try:
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
except error:
pass
if reuse_port and family in (AF_INET, AF_INET6):
sock.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
if has_ipv6 and family == AF_INET6:
if dualstack_ipv6:
sock.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 0)
elif hasattr(_socket, "IPV6_V6ONLY") and \
hasattr(_socket, "IPPROTO_IPV6"):
sock.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 1)
try:
sock.bind(address)
except error as err:
msg = '%s (while attempting to bind on address %r)' % \
(err.strerror, address)
raise error(err.errno, msg) from None
if backlog is None:
sock.listen()
else:
sock.listen(backlog)
return sock
except error:
sock.close()
raise
def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
addrlist = []
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
af, socktype, proto, canonname, sa = res
addrlist.append((_intenum_converter(af, AddressFamily),
_intenum_converter(socktype, SocketKind),
proto, canonname, sa))
return addrlist