import email.parser
import email.message
import errno
import http
import io
import re
import socket
import sys
import collections.abc
from urllib.parse import urlsplit
__all__ = ["HTTPResponse", "HTTPConnection",
"HTTPException", "NotConnected", "UnknownProtocol",
"UnknownTransferEncoding", "UnimplementedFileMode",
"IncompleteRead", "InvalidURL", "ImproperConnectionState",
"CannotSendRequest", "CannotSendHeader", "ResponseNotReady",
"BadStatusLine", "LineTooLong", "RemoteDisconnected", "error",
"responses"]
HTTP_PORT = 80
HTTPS_PORT = 443
_UNKNOWN = 'UNKNOWN'
_CS_IDLE = 'Idle'
_CS_REQ_STARTED = 'Request-started'
_CS_REQ_SENT = 'Request-sent'
globals().update(http.HTTPStatus.__members__)
responses = {v: v.phrase for v in http.HTTPStatus.__members__.values()}
_MAXLINE = 65536
_MAXHEADERS = 100
_MIN_READ_BUF_SIZE = 1 << 20
_is_legal_header_name = re.compile(rb'[^:\s][^:\r\n]*').fullmatch
_is_illegal_header_value = re.compile(rb'\n(?![ \t])|\r(?![ \t\n])').search
_contains_disallowed_url_pchar_re = re.compile('[\x00-\x20\x7f]')
_contains_disallowed_method_pchar_re = re.compile('[\x00-\x1f]')
_METHODS_EXPECTING_BODY = {'PATCH', 'POST', 'PUT'}
def _encode(data, name='data'):
try:
return data.encode("latin-1")
except UnicodeEncodeError as err:
raise UnicodeEncodeError(
err.encoding,
err.object,
err.start,
err.end,
"%s (%.20r) is not valid Latin-1. Use %s.encode('utf-8') "
"if you want to send it encoded in UTF-8." %
(name.title(), data[err.start:err.end], name)) from None
def _strip_ipv6_iface(enc_name: bytes) -> bytes:
enc_name, percent, _ = enc_name.partition(b"%")
if percent:
assert enc_name.startswith(b'['), enc_name
enc_name += b']'
return enc_name
class HTTPMessage(email.message.Message):
def getallmatchingheaders(self, name):
name = name.lower() + ':'
n = len(name)
lst = []
hit = 0
for line in self.keys():
if line[:n].lower() == name:
hit = 1
elif not line[:1].isspace():
hit = 0
if hit:
lst.append(line)
return lst
def _read_headers(fp):
headers = []
while True:
line = fp.readline(_MAXLINE + 1)
if len(line) > _MAXLINE:
raise LineTooLong("header line")
headers.append(line)
if len(headers) > _MAXHEADERS:
raise HTTPException("got more than %d headers" % _MAXHEADERS)
if line in (b'\r\n', b'\n', b''):
break
return headers
def _parse_header_lines(header_lines, _class=HTTPMessage):
hstring = b''.join(header_lines).decode('iso-8859-1')
return email.parser.Parser(_class=_class).parsestr(hstring)
def parse_headers(fp, _class=HTTPMessage):
headers = _read_headers(fp)
return _parse_header_lines(headers, _class)
class HTTPResponse(io.BufferedIOBase):
def __init__(self, sock, debuglevel=0, method=None, url=None):
self.fp = sock.makefile("rb")
self.debuglevel = debuglevel
self._method = method
self.headers = self.msg = None
self.version = _UNKNOWN self.status = _UNKNOWN self.reason = _UNKNOWN
self.chunked = _UNKNOWN self.chunk_left = _UNKNOWN self.length = _UNKNOWN self.will_close = _UNKNOWN
def _read_status(self):
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
if len(line) > _MAXLINE:
raise LineTooLong("status line")
if self.debuglevel > 0:
print("reply:", repr(line))
if not line:
raise RemoteDisconnected("Remote end closed connection without"
" response")
try:
version, status, reason = line.split(None, 2)
except ValueError:
try:
version, status = line.split(None, 1)
reason = ""
except ValueError:
version = ""
if not version.startswith("HTTP/"):
self._close_conn()
raise BadStatusLine(line)
try:
status = int(status)
if status < 100 or status > 999:
raise BadStatusLine(line)
except ValueError:
raise BadStatusLine(line)
return version, status, reason
def begin(self):
if self.headers is not None:
return
while True:
version, status, reason = self._read_status()
if status != CONTINUE:
break
skipped_headers = _read_headers(self.fp)
if self.debuglevel > 0:
print("headers:", skipped_headers)
del skipped_headers
self.code = self.status = status
self.reason = reason.strip()
if version in ("HTTP/1.0", "HTTP/0.9"):
self.version = 10
elif version.startswith("HTTP/1."):
self.version = 11 else:
raise UnknownProtocol(version)
self.headers = self.msg = parse_headers(self.fp)
if self.debuglevel > 0:
for hdr, val in self.headers.items():
print("header:", hdr + ":", val)
tr_enc = self.headers.get("transfer-encoding")
if tr_enc and tr_enc.lower() == "chunked":
self.chunked = True
self.chunk_left = None
else:
self.chunked = False
self.will_close = self._check_close()
self.length = None
length = self.headers.get("content-length")
if length and not self.chunked:
try:
self.length = int(length)
except ValueError:
self.length = None
else:
if self.length < 0: self.length = None
else:
self.length = None
if (status == NO_CONTENT or status == NOT_MODIFIED or
100 <= status < 200 or self._method == "HEAD"):
self.length = 0
if (not self.will_close and
not self.chunked and
self.length is None):
self.will_close = True
def _check_close(self):
conn = self.headers.get("connection")
if self.version == 11:
if conn and "close" in conn.lower():
return True
return False
if self.headers.get("keep-alive"):
return False
if conn and "keep-alive" in conn.lower():
return False
pconn = self.headers.get("proxy-connection")
if pconn and "keep-alive" in pconn.lower():
return False
return True
def _close_conn(self):
fp = self.fp
self.fp = None
fp.close()
def close(self):
try:
super().close() finally:
if self.fp:
self._close_conn()
def flush(self):
super().flush()
if self.fp:
self.fp.flush()
def readable(self):
return True
def isclosed(self):
return self.fp is None
def read(self, amt=None):
if self.fp is None:
return b""
if self._method == "HEAD":
self._close_conn()
return b""
if self.chunked:
return self._read_chunked(amt)
if amt is not None and amt >= 0:
if self.length is not None and amt > self.length:
amt = self.length
s = self.fp.read(amt)
if not s and amt:
self._close_conn()
elif self.length is not None:
self.length -= len(s)
if not self.length:
self._close_conn()
return s
else:
if self.length is None:
s = self.fp.read()
else:
try:
s = self._safe_read(self.length)
except IncompleteRead:
self._close_conn()
raise
self.length = 0
self._close_conn() return s
def readinto(self, b):
if self.fp is None:
return 0
if self._method == "HEAD":
self._close_conn()
return 0
if self.chunked:
return self._readinto_chunked(b)
if self.length is not None:
if len(b) > self.length:
b = memoryview(b)[0:self.length]
n = self.fp.readinto(b)
if not n and b:
self._close_conn()
elif self.length is not None:
self.length -= n
if not self.length:
self._close_conn()
return n
def _read_next_chunk_size(self):
line = self.fp.readline(_MAXLINE + 1)
if len(line) > _MAXLINE:
raise LineTooLong("chunk size")
i = line.find(b";")
if i >= 0:
line = line[:i] try:
return int(line, 16)
except ValueError:
self._close_conn()
raise
def _read_and_discard_trailer(self):
while True:
line = self.fp.readline(_MAXLINE + 1)
if len(line) > _MAXLINE:
raise LineTooLong("trailer line")
if not line:
break
if line in (b'\r\n', b'\n', b''):
break
def _get_chunk_left(self):
chunk_left = self.chunk_left
if not chunk_left: if chunk_left is not None:
self._safe_read(2) try:
chunk_left = self._read_next_chunk_size()
except ValueError:
raise IncompleteRead(b'')
if chunk_left == 0:
self._read_and_discard_trailer()
self._close_conn()
chunk_left = None
self.chunk_left = chunk_left
return chunk_left
def _read_chunked(self, amt=None):
assert self.chunked != _UNKNOWN
if amt is not None and amt < 0:
amt = None
value = []
try:
while (chunk_left := self._get_chunk_left()) is not None:
if amt is not None and amt <= chunk_left:
value.append(self._safe_read(amt))
self.chunk_left = chunk_left - amt
break
value.append(self._safe_read(chunk_left))
if amt is not None:
amt -= chunk_left
self.chunk_left = 0
return b''.join(value)
except IncompleteRead as exc:
raise IncompleteRead(b''.join(value)) from exc
def _readinto_chunked(self, b):
assert self.chunked != _UNKNOWN
total_bytes = 0
mvb = memoryview(b)
try:
while True:
chunk_left = self._get_chunk_left()
if chunk_left is None:
return total_bytes
if len(mvb) <= chunk_left:
n = self._safe_readinto(mvb)
self.chunk_left = chunk_left - n
return total_bytes + n
temp_mvb = mvb[:chunk_left]
n = self._safe_readinto(temp_mvb)
mvb = mvb[n:]
total_bytes += n
self.chunk_left = 0
except IncompleteRead:
raise IncompleteRead(bytes(b[0:total_bytes]))
def _safe_read(self, amt):
cursize = min(amt, _MIN_READ_BUF_SIZE)
data = self.fp.read(cursize)
if len(data) >= amt:
return data
if len(data) < cursize:
raise IncompleteRead(data, amt - len(data))
data = io.BytesIO(data)
data.seek(0, 2)
while True:
delta = min(cursize, amt - cursize)
data.write(self.fp.read(delta))
if data.tell() >= amt:
return data.getvalue()
cursize += delta
if data.tell() < cursize:
raise IncompleteRead(data.getvalue(), amt - data.tell())
def _safe_readinto(self, b):
amt = len(b)
n = self.fp.readinto(b)
if n < amt:
raise IncompleteRead(bytes(b[:n]), amt-n)
return n
def read1(self, n=-1):
if self.fp is None or self._method == "HEAD":
return b""
if self.chunked:
return self._read1_chunked(n)
if self.length is not None and (n < 0 or n > self.length):
n = self.length
result = self.fp.read1(n)
if not result and n:
self._close_conn()
elif self.length is not None:
self.length -= len(result)
if not self.length:
self._close_conn()
return result
def peek(self, n=-1):
if self.fp is None or self._method == "HEAD":
return b""
if self.chunked:
return self._peek_chunked(n)
return self.fp.peek(n)
def readline(self, limit=-1):
if self.fp is None or self._method == "HEAD":
return b""
if self.chunked:
return super().readline(limit)
if self.length is not None and (limit < 0 or limit > self.length):
limit = self.length
result = self.fp.readline(limit)
if not result and limit:
self._close_conn()
elif self.length is not None:
self.length -= len(result)
if not self.length:
self._close_conn()
return result
def _read1_chunked(self, n):
chunk_left = self._get_chunk_left()
if chunk_left is None or n == 0:
return b''
if not (0 <= n <= chunk_left):
n = chunk_left read = self.fp.read1(n)
self.chunk_left -= len(read)
if not read:
raise IncompleteRead(b"")
return read
def _peek_chunked(self, n):
try:
chunk_left = self._get_chunk_left()
except IncompleteRead:
return b'' if chunk_left is None:
return b'' return self.fp.peek(chunk_left)[:chunk_left]
def fileno(self):
return self.fp.fileno()
def getheader(self, name, default=None):
if self.headers is None:
raise ResponseNotReady()
headers = self.headers.get_all(name) or default
if isinstance(headers, str) or not hasattr(headers, '__iter__'):
return headers
else:
return ', '.join(headers)
def getheaders(self):
if self.headers is None:
raise ResponseNotReady()
return list(self.headers.items())
def __iter__(self):
return self
def info(self):
return self.headers
def geturl(self):
return self.url
def getcode(self):
return self.status
def _create_https_context(http_version):
context = ssl._create_default_https_context()
if http_version == 11:
context.set_alpn_protocols(['http/1.1'])
if context.post_handshake_auth is not None:
context.post_handshake_auth = True
return context
class HTTPConnection:
_http_vsn = 11
_http_vsn_str = 'HTTP/1.1'
response_class = HTTPResponse
default_port = HTTP_PORT
auto_open = 1
debuglevel = 0
@staticmethod
def _is_textIO(stream):
return isinstance(stream, io.TextIOBase)
@staticmethod
def _get_content_length(body, method):
if body is None:
if method.upper() in _METHODS_EXPECTING_BODY:
return 0
else:
return None
if hasattr(body, 'read'):
return None
try:
mv = memoryview(body)
return mv.nbytes
except TypeError:
pass
if isinstance(body, str):
return len(body)
return None
def __init__(self, host, port=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, blocksize=8192):
self.timeout = timeout
self.source_address = source_address
self.blocksize = blocksize
self.sock = None
self._buffer = []
self.__response = None
self.__state = _CS_IDLE
self._method = None
self._tunnel_host = None
self._tunnel_port = None
self._tunnel_headers = {}
self._raw_proxy_headers = None
(self.host, self.port) = self._get_hostport(host, port)
self._validate_host(self.host)
self._create_connection = socket.create_connection
def set_tunnel(self, host, port=None, headers=None):
if self.sock:
raise RuntimeError("Can't set up tunnel for established connection")
self._tunnel_host, self._tunnel_port = self._get_hostport(host, port)
if headers:
self._tunnel_headers = headers.copy()
else:
self._tunnel_headers.clear()
if not any(header.lower() == "host" for header in self._tunnel_headers):
encoded_host = self._tunnel_host.encode("idna").decode("ascii")
self._tunnel_headers["Host"] = "%s:%d" % (
encoded_host, self._tunnel_port)
def _get_hostport(self, host, port):
if port is None:
i = host.rfind(':')
j = host.rfind(']') if i > j:
try:
port = int(host[i+1:])
except ValueError:
if host[i+1:] == "": port = self.default_port
else:
raise InvalidURL("nonnumeric port: '%s'" % host[i+1:])
host = host[:i]
else:
port = self.default_port
if host and host[0] == '[' and host[-1] == ']':
host = host[1:-1]
return (host, port)
def set_debuglevel(self, level):
self.debuglevel = level
def _wrap_ipv6(self, ip):
if b':' in ip and ip[0] != b'['[0]:
return b"[" + ip + b"]"
return ip
def _tunnel(self):
connect = b"CONNECT %s:%d %s\r\n" % (
self._wrap_ipv6(self._tunnel_host.encode("idna")),
self._tunnel_port,
self._http_vsn_str.encode("ascii"))
headers = [connect]
for header, value in self._tunnel_headers.items():
headers.append(f"{header}: {value}\r\n".encode("latin-1"))
headers.append(b"\r\n")
self.send(b"".join(headers))
del headers
response = self.response_class(self.sock, method=self._method)
try:
(version, code, message) = response._read_status()
self._raw_proxy_headers = _read_headers(response.fp)
if self.debuglevel > 0:
for header in self._raw_proxy_headers:
print('header:', header.decode())
if code != http.HTTPStatus.OK:
self.close()
raise OSError(f"Tunnel connection failed: {code} {message.strip()}")
finally:
response.close()
def get_proxy_response_headers(self):
return (
_parse_header_lines(self._raw_proxy_headers)
if self._raw_proxy_headers is not None
else None
)
def connect(self):
sys.audit("http.client.connect", self, self.host, self.port)
self.sock = self._create_connection(
(self.host,self.port), self.timeout, self.source_address)
try:
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except OSError as e:
if e.errno != errno.ENOPROTOOPT:
raise
if self._tunnel_host:
self._tunnel()
def close(self):
self.__state = _CS_IDLE
try:
sock = self.sock
if sock:
self.sock = None
sock.close() finally:
response = self.__response
if response:
self.__response = None
response.close()
def send(self, data):
if self.sock is None:
if self.auto_open:
self.connect()
else:
raise NotConnected()
if self.debuglevel > 0:
print("send:", repr(data))
if hasattr(data, "read") :
if self.debuglevel > 0:
print("sending a readable")
encode = self._is_textIO(data)
if encode and self.debuglevel > 0:
print("encoding file using iso-8859-1")
while datablock := data.read(self.blocksize):
if encode:
datablock = datablock.encode("iso-8859-1")
sys.audit("http.client.send", self, datablock)
self.sock.sendall(datablock)
return
sys.audit("http.client.send", self, data)
try:
self.sock.sendall(data)
except TypeError:
if isinstance(data, collections.abc.Iterable):
for d in data:
self.sock.sendall(d)
else:
raise TypeError("data should be a bytes-like object "
"or an iterable, got %r" % type(data))
def _output(self, s):
self._buffer.append(s)
def _read_readable(self, readable):
if self.debuglevel > 0:
print("reading a readable")
encode = self._is_textIO(readable)
if encode and self.debuglevel > 0:
print("encoding file using iso-8859-1")
while datablock := readable.read(self.blocksize):
if encode:
datablock = datablock.encode("iso-8859-1")
yield datablock
def _send_output(self, message_body=None, encode_chunked=False):
self._buffer.extend((b"", b""))
msg = b"\r\n".join(self._buffer)
del self._buffer[:]
self.send(msg)
if message_body is not None:
if hasattr(message_body, 'read'):
chunks = self._read_readable(message_body)
else:
try:
memoryview(message_body)
except TypeError:
try:
chunks = iter(message_body)
except TypeError:
raise TypeError("message_body should be a bytes-like "
"object or an iterable, got %r"
% type(message_body))
else:
chunks = (message_body,)
for chunk in chunks:
if not chunk:
if self.debuglevel > 0:
print('Zero length chunk ignored')
continue
if encode_chunked and self._http_vsn == 11:
chunk = f'{len(chunk):X}\r\n'.encode('ascii') + chunk \
+ b'\r\n'
self.send(chunk)
if encode_chunked and self._http_vsn == 11:
self.send(b'0\r\n\r\n')
def putrequest(self, method, url, skip_host=False,
skip_accept_encoding=False):
if self.__response and self.__response.isclosed():
self.__response = None
if self.__state == _CS_IDLE:
self.__state = _CS_REQ_STARTED
else:
raise CannotSendRequest(self.__state)
self._validate_method(method)
self._method = method
url = url or '/'
self._validate_path(url)
request = '%s %s %s' % (method, url, self._http_vsn_str)
self._output(self._encode_request(request))
if self._http_vsn == 11:
if not skip_host:
netloc = ''
if url.startswith('http'):
nil, netloc, nil, nil, nil = urlsplit(url)
if netloc:
try:
netloc_enc = netloc.encode("ascii")
except UnicodeEncodeError:
netloc_enc = netloc.encode("idna")
self.putheader('Host', _strip_ipv6_iface(netloc_enc))
else:
if self._tunnel_host:
host = self._tunnel_host
port = self._tunnel_port
else:
host = self.host
port = self.port
try:
host_enc = host.encode("ascii")
except UnicodeEncodeError:
host_enc = host.encode("idna")
host_enc = self._wrap_ipv6(host_enc)
if ":" in host:
host_enc = _strip_ipv6_iface(host_enc)
if port == self.default_port:
self.putheader('Host', host_enc)
else:
host_enc = host_enc.decode("ascii")
self.putheader('Host', "%s:%s" % (host_enc, port))
if not skip_accept_encoding:
self.putheader('Accept-Encoding', 'identity')
else:
pass
def _encode_request(self, request):
return request.encode('ascii')
def _validate_method(self, method):
match = _contains_disallowed_method_pchar_re.search(method)
if match:
raise ValueError(
f"method can't contain control characters. {method!r} "
f"(found at least {match.group()!r})")
def _validate_path(self, url):
match = _contains_disallowed_url_pchar_re.search(url)
if match:
raise InvalidURL(f"URL can't contain control characters. {url!r} "
f"(found at least {match.group()!r})")
def _validate_host(self, host):
match = _contains_disallowed_url_pchar_re.search(host)
if match:
raise InvalidURL(f"URL can't contain control characters. {host!r} "
f"(found at least {match.group()!r})")
def putheader(self, header, *values):
if self.__state != _CS_REQ_STARTED:
raise CannotSendHeader()
if hasattr(header, 'encode'):
header = header.encode('ascii')
if not _is_legal_header_name(header):
raise ValueError('Invalid header name %r' % (header,))
values = list(values)
for i, one_value in enumerate(values):
if hasattr(one_value, 'encode'):
values[i] = one_value.encode('latin-1')
elif isinstance(one_value, int):
values[i] = str(one_value).encode('ascii')
if _is_illegal_header_value(values[i]):
raise ValueError('Invalid header value %r' % (values[i],))
value = b'\r\n\t'.join(values)
header = header + b': ' + value
self._output(header)
def endheaders(self, message_body=None, *, encode_chunked=False):
if self.__state == _CS_REQ_STARTED:
self.__state = _CS_REQ_SENT
else:
raise CannotSendHeader()
self._send_output(message_body, encode_chunked=encode_chunked)
def request(self, method, url, body=None, headers={}, *,
encode_chunked=False):
self._send_request(method, url, body, headers, encode_chunked)
def _send_request(self, method, url, body, headers, encode_chunked):
header_names = frozenset(k.lower() for k in headers)
skips = {}
if 'host' in header_names:
skips['skip_host'] = 1
if 'accept-encoding' in header_names:
skips['skip_accept_encoding'] = 1
self.putrequest(method, url, **skips)
if 'content-length' not in header_names:
if 'transfer-encoding' not in header_names:
encode_chunked = False
content_length = self._get_content_length(body, method)
if content_length is None:
if body is not None:
if self.debuglevel > 0:
print('Unable to determine size of %r' % body)
encode_chunked = True
self.putheader('Transfer-Encoding', 'chunked')
else:
self.putheader('Content-Length', str(content_length))
else:
encode_chunked = False
for hdr, value in headers.items():
self.putheader(hdr, value)
if isinstance(body, str):
body = _encode(body, 'body')
self.endheaders(body, encode_chunked=encode_chunked)
def getresponse(self):
if self.__response and self.__response.isclosed():
self.__response = None
if self.__state != _CS_REQ_SENT or self.__response:
raise ResponseNotReady(self.__state)
if self.debuglevel > 0:
response = self.response_class(self.sock, self.debuglevel,
method=self._method)
else:
response = self.response_class(self.sock, method=self._method)
try:
try:
response.begin()
except ConnectionError:
self.close()
raise
assert response.will_close != _UNKNOWN
self.__state = _CS_IDLE
if response.will_close:
self.close()
else:
self.__response = response
return response
except:
response.close()
raise
try:
import ssl
except ImportError:
pass
else:
class HTTPSConnection(HTTPConnection):
"This class allows communication via SSL."
default_port = HTTPS_PORT
def __init__(self, host, port=None,
*, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, context=None, blocksize=8192):
super(HTTPSConnection, self).__init__(host, port, timeout,
source_address,
blocksize=blocksize)
if context is None:
context = _create_https_context(self._http_vsn)
self._context = context
def connect(self):
"Connect to a host on a given (SSL) port."
super().connect()
if self._tunnel_host:
server_hostname = self._tunnel_host
else:
server_hostname = self.host
self.sock = self._context.wrap_socket(self.sock,
server_hostname=server_hostname)
__all__.append("HTTPSConnection")
class HTTPException(Exception):
pass
class NotConnected(HTTPException):
pass
class InvalidURL(HTTPException):
pass
class UnknownProtocol(HTTPException):
def __init__(self, version):
self.args = version,
self.version = version
class UnknownTransferEncoding(HTTPException):
pass
class UnimplementedFileMode(HTTPException):
pass
class IncompleteRead(HTTPException):
def __init__(self, partial, expected=None):
self.args = partial,
self.partial = partial
self.expected = expected
def __repr__(self):
if self.expected is not None:
e = ', %i more expected' % self.expected
else:
e = ''
return '%s(%i bytes read%s)' % (self.__class__.__name__,
len(self.partial), e)
__str__ = object.__str__
class ImproperConnectionState(HTTPException):
pass
class CannotSendRequest(ImproperConnectionState):
pass
class CannotSendHeader(ImproperConnectionState):
pass
class ResponseNotReady(ImproperConnectionState):
pass
class BadStatusLine(HTTPException):
def __init__(self, line):
if not line:
line = repr(line)
self.args = line,
self.line = line
class LineTooLong(HTTPException):
def __init__(self, line_type):
HTTPException.__init__(self, "got more than %d bytes when reading %s"
% (_MAXLINE, line_type))
class RemoteDisconnected(ConnectionResetError, BadStatusLine):
def __init__(self, *pos, **kw):
BadStatusLine.__init__(self, "")
ConnectionResetError.__init__(self, *pos, **kw)
error = HTTPException