__version__ = "0.6"
__all__ = [
"HTTPServer", "ThreadingHTTPServer",
"HTTPSServer", "ThreadingHTTPSServer",
"BaseHTTPRequestHandler", "SimpleHTTPRequestHandler",
"CGIHTTPRequestHandler",
]
import copy
import datetime
import email.utils
import html
import http.client
import io
import itertools
import mimetypes
import os
import posixpath
import select
import shutil
import socket
import socketserver
import sys
import time
import urllib.parse
from http import HTTPStatus
DEFAULT_ERROR_MESSAGE = """\
<!DOCTYPE HTML>
<html lang="en">
<head>
<meta charset="utf-8">
<style type="text/css">
:root {
color-scheme: light dark;
}
</style>
<title>Error response</title>
</head>
<body>
<h1>Error response</h1>
<p>Error code: %(code)d</p>
<p>Message: %(message)s.</p>
<p>Error code explanation: %(code)s - %(explain)s.</p>
</body>
</html>
"""
DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"
_MIN_READ_BUF_SIZE = 1 << 20
class HTTPServer(socketserver.TCPServer):
allow_reuse_address = True allow_reuse_port = False
def server_bind(self):
socketserver.TCPServer.server_bind(self)
host, port = self.server_address[:2]
self.server_name = socket.getfqdn(host)
self.server_port = port
class ThreadingHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
daemon_threads = True
class HTTPSServer(HTTPServer):
def __init__(self, server_address, RequestHandlerClass,
bind_and_activate=True, *, certfile, keyfile=None,
password=None, alpn_protocols=None):
try:
import ssl
except ImportError:
raise RuntimeError("SSL module is missing; "
"HTTPS support is unavailable")
self.ssl = ssl
self.certfile = certfile
self.keyfile = keyfile
self.password = password
self.alpn_protocols = (
["http/1.1"] if alpn_protocols is None else alpn_protocols
)
super().__init__(server_address,
RequestHandlerClass,
bind_and_activate)
def server_activate(self):
super().server_activate()
context = self._create_context()
self.socket = context.wrap_socket(self.socket, server_side=True)
def _create_context(self):
context = self.ssl.create_default_context(self.ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(self.certfile, self.keyfile, self.password)
context.set_alpn_protocols(self.alpn_protocols)
return context
class ThreadingHTTPSServer(socketserver.ThreadingMixIn, HTTPSServer):
daemon_threads = True
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
sys_version = "Python/" + sys.version.split()[0]
server_version = "BaseHTTP/" + __version__
error_message_format = DEFAULT_ERROR_MESSAGE
error_content_type = DEFAULT_ERROR_CONTENT_TYPE
default_request_version = "HTTP/0.9"
def parse_request(self):
is_http_0_9 = False
self.command = None self.request_version = version = self.default_request_version
self.close_connection = True
requestline = str(self.raw_requestline, 'iso-8859-1')
requestline = requestline.rstrip('\r\n')
self.requestline = requestline
words = requestline.split()
if len(words) == 0:
return False
if len(words) >= 3: version = words[-1]
try:
if not version.startswith('HTTP/'):
raise ValueError
base_version_number = version.split('/', 1)[1]
version_number = base_version_number.split(".")
if len(version_number) != 2:
raise ValueError
if any(not component.isdigit() for component in version_number):
raise ValueError("non digit in http version")
if any(len(component) > 10 for component in version_number):
raise ValueError("unreasonable length http version")
version_number = int(version_number[0]), int(version_number[1])
except (ValueError, IndexError):
self.send_error(
HTTPStatus.BAD_REQUEST,
"Bad request version (%r)" % version)
return False
if version_number >= (1, 1) and self.protocol_version >= "HTTP/1.1":
self.close_connection = False
if version_number >= (2, 0):
self.send_error(
HTTPStatus.HTTP_VERSION_NOT_SUPPORTED,
"Invalid HTTP version (%s)" % base_version_number)
return False
self.request_version = version
if not 2 <= len(words) <= 3:
self.send_error(
HTTPStatus.BAD_REQUEST,
"Bad request syntax (%r)" % requestline)
return False
command, path = words[:2]
if len(words) == 2:
self.close_connection = True
if command != 'GET':
self.send_error(
HTTPStatus.BAD_REQUEST,
"Bad HTTP/0.9 request type (%r)" % command)
return False
is_http_0_9 = True
self.command, self.path = command, path
if self.path.startswith('//'):
self.path = '/' + self.path.lstrip('/')
if is_http_0_9:
self.headers = {}
return True
try:
self.headers = http.client.parse_headers(self.rfile,
_class=self.MessageClass)
except http.client.LineTooLong as err:
self.send_error(
HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE,
"Line too long",
str(err))
return False
except http.client.HTTPException as err:
self.send_error(
HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE,
"Too many headers",
str(err)
)
return False
conntype = self.headers.get('Connection', "")
if conntype.lower() == 'close':
self.close_connection = True
elif (conntype.lower() == 'keep-alive' and
self.protocol_version >= "HTTP/1.1"):
self.close_connection = False
expect = self.headers.get('Expect', "")
if (expect.lower() == "100-continue" and
self.protocol_version >= "HTTP/1.1" and
self.request_version >= "HTTP/1.1"):
if not self.handle_expect_100():
return False
return True
def handle_expect_100(self):
self.send_response_only(HTTPStatus.CONTINUE)
self.end_headers()
return True
def handle_one_request(self):
try:
self.raw_requestline = self.rfile.readline(65537)
if len(self.raw_requestline) > 65536:
self.requestline = ''
self.request_version = ''
self.command = ''
self.send_error(HTTPStatus.REQUEST_URI_TOO_LONG)
return
if not self.raw_requestline:
self.close_connection = True
return
if not self.parse_request():
return
mname = 'do_' + self.command
if not hasattr(self, mname):
self.send_error(
HTTPStatus.NOT_IMPLEMENTED,
"Unsupported method (%r)" % self.command)
return
method = getattr(self, mname)
method()
self.wfile.flush() except TimeoutError as e:
self.log_error("Request timed out: %r", e)
self.close_connection = True
return
def handle(self):
self.close_connection = True
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
def send_error(self, code, message=None, explain=None):
try:
shortmsg, longmsg = self.responses[code]
except KeyError:
shortmsg, longmsg = '???', '???'
if message is None:
message = shortmsg
if explain is None:
explain = longmsg
self.log_error("code %d, message %s", code, message)
self.send_response(code, message)
self.send_header('Connection', 'close')
body = None
if (code >= 200 and
code not in (HTTPStatus.NO_CONTENT,
HTTPStatus.RESET_CONTENT,
HTTPStatus.NOT_MODIFIED)):
content = (self.error_message_format % {
'code': code,
'message': html.escape(message, quote=False),
'explain': html.escape(explain, quote=False)
})
body = content.encode('UTF-8', 'replace')
self.send_header("Content-Type", self.error_content_type)
self.send_header('Content-Length', str(len(body)))
self.end_headers()
if self.command != 'HEAD' and body:
self.wfile.write(body)
def send_response(self, code, message=None):
self.log_request(code)
self.send_response_only(code, message)
self.send_header('Server', self.version_string())
self.send_header('Date', self.date_time_string())
def send_response_only(self, code, message=None):
if self.request_version != 'HTTP/0.9':
if message is None:
if code in self.responses:
message = self.responses[code][0]
else:
message = ''
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
self._headers_buffer.append(("%s %d %s\r\n" %
(self.protocol_version, code, message)).encode(
'latin-1', 'strict'))
def send_header(self, keyword, value):
if self.request_version != 'HTTP/0.9':
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
self._headers_buffer.append(
("%s: %s\r\n" % (keyword, value)).encode('latin-1', 'strict'))
if keyword.lower() == 'connection':
if value.lower() == 'close':
self.close_connection = True
elif value.lower() == 'keep-alive':
self.close_connection = False
def end_headers(self):
if self.request_version != 'HTTP/0.9':
self._headers_buffer.append(b"\r\n")
self.flush_headers()
def flush_headers(self):
if hasattr(self, '_headers_buffer'):
self.wfile.write(b"".join(self._headers_buffer))
self._headers_buffer = []
def log_request(self, code='-', size='-'):
if isinstance(code, HTTPStatus):
code = code.value
self.log_message('"%s" %s %s',
self.requestline, str(code), str(size))
def log_error(self, format, *args):
self.log_message(format, *args)
_control_char_table = str.maketrans(
{c: fr'\x{c:02x}' for c in itertools.chain(range(0x20), range(0x7f,0xa0))})
_control_char_table[ord('\\')] = r'\\'
def log_message(self, format, *args):
message = format % args
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
message.translate(self._control_char_table)))
def version_string(self):
return self.server_version + ' ' + self.sys_version
def date_time_string(self, timestamp=None):
if timestamp is None:
timestamp = time.time()
return email.utils.formatdate(timestamp, usegmt=True)
def log_date_time_string(self):
now = time.time()
year, month, day, hh, mm, ss, x, y, z = time.localtime(now)
s = "%02d/%3s/%04d %02d:%02d:%02d" % (
day, self.monthname[month], year, hh, mm, ss)
return s
weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
monthname = [None,
'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def address_string(self):
return self.client_address[0]
protocol_version = "HTTP/1.0"
MessageClass = http.client.HTTPMessage
responses = {
v: (v.phrase, v.description)
for v in HTTPStatus.__members__.values()
}
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
server_version = "SimpleHTTP/" + __version__
index_pages = ("index.html", "index.htm")
extensions_map = _encodings_map_default = {
'.gz': 'application/gzip',
'.Z': 'application/octet-stream',
'.bz2': 'application/x-bzip2',
'.xz': 'application/x-xz',
}
def __init__(self, *args, directory=None, **kwargs):
if directory is None:
directory = os.getcwd()
self.directory = os.fspath(directory)
super().__init__(*args, **kwargs)
def do_GET(self):
f = self.send_head()
if f:
try:
self.copyfile(f, self.wfile)
finally:
f.close()
def do_HEAD(self):
f = self.send_head()
if f:
f.close()
def send_head(self):
path = self.translate_path(self.path)
f = None
if os.path.isdir(path):
parts = urllib.parse.urlsplit(self.path)
if not parts.path.endswith(('/', '%2f', '%2F')):
self.send_response(HTTPStatus.MOVED_PERMANENTLY)
new_parts = (parts[0], parts[1], parts[2] + '/',
parts[3], parts[4])
new_url = urllib.parse.urlunsplit(new_parts)
self.send_header("Location", new_url)
self.send_header("Content-Length", "0")
self.end_headers()
return None
for index in self.index_pages:
index = os.path.join(path, index)
if os.path.isfile(index):
path = index
break
else:
return self.list_directory(path)
ctype = self.guess_type(path)
if path.endswith("/"):
self.send_error(HTTPStatus.NOT_FOUND, "File not found")
return None
try:
f = open(path, 'rb')
except OSError:
self.send_error(HTTPStatus.NOT_FOUND, "File not found")
return None
try:
fs = os.fstat(f.fileno())
if ("If-Modified-Since" in self.headers
and "If-None-Match" not in self.headers):
try:
ims = email.utils.parsedate_to_datetime(
self.headers["If-Modified-Since"])
except (TypeError, IndexError, OverflowError, ValueError):
pass
else:
if ims.tzinfo is None:
ims = ims.replace(tzinfo=datetime.timezone.utc)
if ims.tzinfo is datetime.timezone.utc:
last_modif = datetime.datetime.fromtimestamp(
fs.st_mtime, datetime.timezone.utc)
last_modif = last_modif.replace(microsecond=0)
if last_modif <= ims:
self.send_response(HTTPStatus.NOT_MODIFIED)
self.end_headers()
f.close()
return None
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", ctype)
self.send_header("Content-Length", str(fs[6]))
self.send_header("Last-Modified",
self.date_time_string(fs.st_mtime))
self.end_headers()
return f
except:
f.close()
raise
def list_directory(self, path):
try:
list = os.listdir(path)
except OSError:
self.send_error(
HTTPStatus.NOT_FOUND,
"No permission to list directory")
return None
list.sort(key=lambda a: a.lower())
r = []
displaypath = self.path
displaypath = displaypath.split('#', 1)[0]
displaypath = displaypath.split('?', 1)[0]
try:
displaypath = urllib.parse.unquote(displaypath,
errors='surrogatepass')
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(displaypath)
displaypath = html.escape(displaypath, quote=False)
enc = sys.getfilesystemencoding()
title = f'Directory listing for {displaypath}'
r.append('<!DOCTYPE HTML>')
r.append('<html lang="en">')
r.append('<head>')
r.append(f'<meta charset="{enc}">')
r.append('<style type="text/css">\n:root {\ncolor-scheme: light dark;\n}\n</style>')
r.append(f'<title>{title}</title>\n</head>')
r.append(f'<body>\n<h1>{title}</h1>')
r.append('<hr>\n<ul>')
for name in list:
fullname = os.path.join(path, name)
displayname = linkname = name
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
r.append('<li><a href="%s">%s</a></li>'
% (urllib.parse.quote(linkname,
errors='surrogatepass'),
html.escape(displayname, quote=False)))
r.append('</ul>\n<hr>\n</body>\n</html>\n')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/html; charset=%s" % enc)
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
return f
def translate_path(self, path):
path = path.split('#', 1)[0]
path = path.split('?', 1)[0]
try:
path = urllib.parse.unquote(path, errors='surrogatepass')
except UnicodeDecodeError:
path = urllib.parse.unquote(path)
trailing_slash = path.endswith('/')
path = posixpath.normpath(path)
words = path.split('/')
words = filter(None, words)
path = self.directory
for word in words:
if os.path.dirname(word) or word in (os.curdir, os.pardir):
continue
path = os.path.join(path, word)
if trailing_slash:
path += '/'
return path
def copyfile(self, source, outputfile):
shutil.copyfileobj(source, outputfile)
def guess_type(self, path):
base, ext = posixpath.splitext(path)
if ext in self.extensions_map:
return self.extensions_map[ext]
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
guess, _ = mimetypes.guess_file_type(path)
if guess:
return guess
return 'application/octet-stream'
def _url_collapse_path(path):
path, _, query = path.partition('?')
path = urllib.parse.unquote(path)
path_parts = path.split('/')
head_parts = []
for part in path_parts[:-1]:
if part == '..':
head_parts.pop() elif part and part != '.':
head_parts.append( part )
if path_parts:
tail_part = path_parts.pop()
if tail_part:
if tail_part == '..':
head_parts.pop()
tail_part = ''
elif tail_part == '.':
tail_part = ''
else:
tail_part = ''
if query:
tail_part = '?'.join((tail_part, query))
splitpath = ('/' + '/'.join(head_parts), tail_part)
collapsed_path = "/".join(splitpath)
return collapsed_path
nobody = None
def nobody_uid():
global nobody
if nobody:
return nobody
try:
import pwd
except ImportError:
return -1
try:
nobody = pwd.getpwnam('nobody')[2]
except KeyError:
nobody = 1 + max(x[2] for x in pwd.getpwall())
return nobody
def executable(path):
return os.access(path, os.X_OK)
class CGIHTTPRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
import warnings
warnings._deprecated("http.server.CGIHTTPRequestHandler",
remove=(3, 15))
super().__init__(*args, **kwargs)
have_fork = hasattr(os, 'fork')
rbufsize = 0
def do_POST(self):
if self.is_cgi():
self.run_cgi()
else:
self.send_error(
HTTPStatus.NOT_IMPLEMENTED,
"Can only POST to CGI scripts")
def send_head(self):
if self.is_cgi():
return self.run_cgi()
else:
return SimpleHTTPRequestHandler.send_head(self)
def is_cgi(self):
collapsed_path = _url_collapse_path(self.path)
dir_sep = collapsed_path.find('/', 1)
while dir_sep > 0 and not collapsed_path[:dir_sep] in self.cgi_directories:
dir_sep = collapsed_path.find('/', dir_sep+1)
if dir_sep > 0:
head, tail = collapsed_path[:dir_sep], collapsed_path[dir_sep+1:]
self.cgi_info = head, tail
return True
return False
cgi_directories = ['/cgi-bin', '/htbin']
def is_executable(self, path):
return executable(path)
def is_python(self, path):
head, tail = os.path.splitext(path)
return tail.lower() in (".py", ".pyw")
def run_cgi(self):
dir, rest = self.cgi_info
path = dir + '/' + rest
i = path.find('/', len(dir)+1)
while i >= 0:
nextdir = path[:i]
nextrest = path[i+1:]
scriptdir = self.translate_path(nextdir)
if os.path.isdir(scriptdir):
dir, rest = nextdir, nextrest
i = path.find('/', len(dir)+1)
else:
break
rest, _, query = rest.partition('?')
i = rest.find('/')
if i >= 0:
script, rest = rest[:i], rest[i:]
else:
script, rest = rest, ''
scriptname = dir + '/' + script
scriptfile = self.translate_path(scriptname)
if not os.path.exists(scriptfile):
self.send_error(
HTTPStatus.NOT_FOUND,
"No such CGI script (%r)" % scriptname)
return
if not os.path.isfile(scriptfile):
self.send_error(
HTTPStatus.FORBIDDEN,
"CGI script is not a plain file (%r)" % scriptname)
return
ispy = self.is_python(scriptname)
if self.have_fork or not ispy:
if not self.is_executable(scriptfile):
self.send_error(
HTTPStatus.FORBIDDEN,
"CGI script is not executable (%r)" % scriptname)
return
env = copy.deepcopy(os.environ)
env['SERVER_SOFTWARE'] = self.version_string()
env['SERVER_NAME'] = self.server.server_name
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
env['SERVER_PROTOCOL'] = self.protocol_version
env['SERVER_PORT'] = str(self.server.server_port)
env['REQUEST_METHOD'] = self.command
uqrest = urllib.parse.unquote(rest)
env['PATH_INFO'] = uqrest
env['PATH_TRANSLATED'] = self.translate_path(uqrest)
env['SCRIPT_NAME'] = scriptname
env['QUERY_STRING'] = query
env['REMOTE_ADDR'] = self.client_address[0]
authorization = self.headers.get("authorization")
if authorization:
authorization = authorization.split()
if len(authorization) == 2:
import base64, binascii
env['AUTH_TYPE'] = authorization[0]
if authorization[0].lower() == "basic":
try:
authorization = authorization[1].encode('ascii')
authorization = base64.decodebytes(authorization).\
decode('ascii')
except (binascii.Error, UnicodeError):
pass
else:
authorization = authorization.split(':')
if len(authorization) == 2:
env['REMOTE_USER'] = authorization[0]
if self.headers.get('content-type') is None:
env['CONTENT_TYPE'] = self.headers.get_content_type()
else:
env['CONTENT_TYPE'] = self.headers['content-type']
length = self.headers.get('content-length')
if length:
env['CONTENT_LENGTH'] = length
referer = self.headers.get('referer')
if referer:
env['HTTP_REFERER'] = referer
accept = self.headers.get_all('accept', ())
env['HTTP_ACCEPT'] = ','.join(accept)
ua = self.headers.get('user-agent')
if ua:
env['HTTP_USER_AGENT'] = ua
co = filter(None, self.headers.get_all('cookie', []))
cookie_str = ', '.join(co)
if cookie_str:
env['HTTP_COOKIE'] = cookie_str
for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
env.setdefault(k, "")
self.send_response(HTTPStatus.OK, "Script output follows")
self.flush_headers()
decoded_query = query.replace('+', ' ')
if self.have_fork:
args = [script]
if '=' not in decoded_query:
args.append(decoded_query)
nobody = nobody_uid()
self.wfile.flush() pid = os.fork()
if pid != 0:
pid, sts = os.waitpid(pid, 0)
while select.select([self.rfile], [], [], 0)[0]:
if not self.rfile.read(1):
break
exitcode = os.waitstatus_to_exitcode(sts)
if exitcode:
self.log_error(f"CGI script exit code {exitcode}")
return
try:
try:
os.setuid(nobody)
except OSError:
pass
os.dup2(self.rfile.fileno(), 0)
os.dup2(self.wfile.fileno(), 1)
os.execve(scriptfile, args, env)
except:
self.server.handle_error(self.request, self.client_address)
os._exit(127)
else:
import subprocess
cmdline = [scriptfile]
if self.is_python(scriptfile):
interp = sys.executable
if interp.lower().endswith("w.exe"):
interp = interp[:-5] + interp[-4:]
cmdline = [interp, '-u'] + cmdline
if '=' not in query:
cmdline.append(query)
self.log_message("command: %s", subprocess.list2cmdline(cmdline))
try:
nbytes = int(length)
except (TypeError, ValueError):
nbytes = 0
p = subprocess.Popen(cmdline,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env = env
)
if self.command.lower() == "post" and nbytes > 0:
cursize = 0
data = self.rfile.read(min(nbytes, _MIN_READ_BUF_SIZE))
while len(data) < nbytes and len(data) != cursize:
cursize = len(data)
delta = min(cursize, nbytes - cursize)
try:
data += self.rfile.read(delta)
except TimeoutError:
break
else:
data = None
while select.select([self.rfile._sock], [], [], 0)[0]:
if not self.rfile._sock.recv(1):
break
stdout, stderr = p.communicate(data)
self.wfile.write(stdout)
if stderr:
self.log_error('%s', stderr)
p.stderr.close()
p.stdout.close()
status = p.returncode
if status:
self.log_error("CGI script exit status %#x", status)
else:
self.log_message("CGI script exited OK")
def _get_best_family(*address):
infos = socket.getaddrinfo(
*address,
type=socket.SOCK_STREAM,
flags=socket.AI_PASSIVE,
)
family, type, proto, canonname, sockaddr = next(iter(infos))
return family, sockaddr
def test(HandlerClass=BaseHTTPRequestHandler,
ServerClass=ThreadingHTTPServer,
protocol="HTTP/1.0", port=8000, bind=None,
tls_cert=None, tls_key=None, tls_password=None):
ServerClass.address_family, addr = _get_best_family(bind, port)
HandlerClass.protocol_version = protocol
if tls_cert:
server = ServerClass(addr, HandlerClass, certfile=tls_cert,
keyfile=tls_key, password=tls_password)
else:
server = ServerClass(addr, HandlerClass)
with server as httpd:
host, port = httpd.socket.getsockname()[:2]
url_host = f'[{host}]' if ':' in host else host
protocol = 'HTTPS' if tls_cert else 'HTTP'
print(
f"Serving {protocol} on {host} port {port} "
f"({protocol.lower()}://{url_host}:{port}/) ..."
)
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
if __name__ == '__main__':
import argparse
import contextlib
parser = argparse.ArgumentParser(color=True)
parser.add_argument('--cgi', action='store_true',
help='run as CGI server')
parser.add_argument('-b', '--bind', metavar='ADDRESS',
help='bind to this address '
'(default: all interfaces)')
parser.add_argument('-d', '--directory', default=os.getcwd(),
help='serve this directory '
'(default: current directory)')
parser.add_argument('-p', '--protocol', metavar='VERSION',
default='HTTP/1.0',
help='conform to this HTTP version '
'(default: %(default)s)')
parser.add_argument('--tls-cert', metavar='PATH',
help='path to the TLS certificate chain file')
parser.add_argument('--tls-key', metavar='PATH',
help='path to the TLS key file')
parser.add_argument('--tls-password-file', metavar='PATH',
help='path to the password file for the TLS key')
parser.add_argument('port', default=8000, type=int, nargs='?',
help='bind to this port '
'(default: %(default)s)')
args = parser.parse_args()
if not args.tls_cert and args.tls_key:
parser.error("--tls-key requires --tls-cert to be set")
tls_key_password = None
if args.tls_password_file:
if not args.tls_cert:
parser.error("--tls-password-file requires --tls-cert to be set")
try:
with open(args.tls_password_file, "r", encoding="utf-8") as f:
tls_key_password = f.read().strip()
except OSError as e:
parser.error(f"Failed to read TLS password file: {e}")
if args.cgi:
handler_class = CGIHTTPRequestHandler
else:
handler_class = SimpleHTTPRequestHandler
class DualStackServerMixin:
def server_bind(self):
with contextlib.suppress(Exception):
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
def finish_request(self, request, client_address):
self.RequestHandlerClass(request, client_address, self,
directory=args.directory)
class HTTPDualStackServer(DualStackServerMixin, ThreadingHTTPServer):
pass
class HTTPSDualStackServer(DualStackServerMixin, ThreadingHTTPSServer):
pass
ServerClass = HTTPSDualStackServer if args.tls_cert else HTTPDualStackServer
test(
HandlerClass=handler_class,
ServerClass=ServerClass,
port=args.port,
bind=args.bind,
protocol=args.protocol,
tls_cert=args.tls_cert,
tls_key=args.tls_key,
tls_password=tls_key_password,
)