import logging
import paramiko
import socket
import threading
logger = logging.getLogger("server")
class Server(paramiko.ServerInterface):
def __init__(self):
self.exec_events = {}
self.exec_commands = {}
def check_channel_request(self, kind, chanid):
if kind == "session":
self.exec_events[chanid] = threading.Event()
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_password(self, username, password):
if (username, password) == ("alice", "alicealice"):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
allowed_keys = [
("edward", "ssh-ed25519", "ad215301215ca80b7083cd49b5f7be54"),
("ruth", "ssh-rsa", "ae86f75870515995b6726faacf8a1ac8"),
("ruth", "ssh-rsa", "3b18ce162d26a656a47bdd62095139f5"),
("ruth", "ssh-rsa", "0c3f4a5b7c25f26e11e93dd8126c6e81"),
("eda", "ecdsa-sha2-nistp256", "a05caef0cdf7630ffa1bf7ce4ac17bbd"),
("eda", "ecdsa-sha2-nistp384", "e7739d20c38730d1336a498b3e2e8dd9"),
]
entry = (username, key.get_name(), key.get_fingerprint().hex())
if entry in allowed_keys:
return paramiko.AUTH_SUCCESSFUL
print(entry, flush=True)
return paramiko.AUTH_FAILED
def check_auth_none(self, username):
if username == "queen":
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_channel_shell_request(self, channel):
return True
def check_channel_exec_request(self, channel, command):
self.exec_commands[channel.chanid] = command
self.exec_events[channel.chanid].set()
return True
def check_channel_pty_request(self, channel, *args):
return True
def get_allowed_auths(self, username):
if username == "alice":
return "password"
return ""
def run_channel(server, channel):
logger.info(f"opened channel {channel.chanid}")
server.exec_events[channel.chanid].wait()
command = server.exec_commands[channel.chanid]
logger.info(f"received command {command!r} on channel {channel.chanid}")
if command == b"whoami":
channel.send(b"alice\n")
channel.send_exit_status(0)
elif command == b"cat":
data_len = 0
while data := channel.recv(1024):
logger.info(f"received {len(data)} bytes")
while data:
sent_len = channel.send(data)
logger.info(f"sent {len(data)} bytes")
data = data[sent_len:]
data_len += len(data)
logger.info(f"received eof after processing {data_len} bytes")
if not channel.closed:
channel.send_exit_status(0)
elif command == b"true":
channel.send_exit_status(0)
elif command == b"false":
channel.send_exit_status(1)
else:
channel.send_stderr(b"unknown command!\n")
channel.send_exit_status(127)
channel.close()
logger.info(f"closed channel {channel.chanid}")
def run_client(server_keys, client_sock, client_addr):
logger.info(f"received connection from {client_addr!r}")
trans = paramiko.Transport(client_sock)
try:
for key in server_keys:
trans.add_server_key(key)
server = Server()
trans.start_server(server=server)
threads = []
while (channel := trans.accept(None)) is not None:
thread = threading.Thread(target=run_channel, args=(server, channel,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
finally:
trans.close()
client_sock.close()
def run_server():
from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1, SECP384R1
server_keys = [
paramiko.rsakey.RSAKey.generate(1024),
paramiko.ed25519key.Ed25519Key.from_private_key_file("host_key_ed25519"),
paramiko.ecdsakey.ECDSAKey.generate(SECP256R1),
paramiko.ecdsakey.ECDSAKey.generate(SECP384R1),
]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", 22))
sock.listen(100)
logger.info("listening on port 22")
while True:
client_sock, client_addr = sock.accept()
thread = threading.Thread(target=run_client, args=(server_keys, client_sock, client_addr))
thread.start()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
run_server()