makiko 0.2.5

Asynchronous SSH client library in pure Rust
Documentation
import logging
import paramiko
import socket
import threading

logger = logging.getLogger("server")

class Server(paramiko.ServerInterface):
    def __init__(self):
        self.exec_events = {}
        self.exec_commands = {}

    def check_channel_request(self, kind, chanid):
        if kind == "session":
            self.exec_events[chanid] = threading.Event()
            return paramiko.OPEN_SUCCEEDED
        return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED

    def check_auth_password(self, username, password):
        if (username, password) == ("alice", "alicealice"):
            return paramiko.AUTH_SUCCESSFUL
        return paramiko.AUTH_FAILED

    def check_auth_publickey(self, username, key):
        allowed_keys = [
            ("edward", "ssh-ed25519", "ad215301215ca80b7083cd49b5f7be54"),
            ("ruth", "ssh-rsa", "ae86f75870515995b6726faacf8a1ac8"),
            ("ruth", "ssh-rsa", "3b18ce162d26a656a47bdd62095139f5"),
            ("ruth", "ssh-rsa", "0c3f4a5b7c25f26e11e93dd8126c6e81"),
            ("eda", "ecdsa-sha2-nistp256", "a05caef0cdf7630ffa1bf7ce4ac17bbd"),
            ("eda", "ecdsa-sha2-nistp384", "e7739d20c38730d1336a498b3e2e8dd9"),
        ]

        entry = (username, key.get_name(), key.get_fingerprint().hex())
        if entry in allowed_keys:
            return paramiko.AUTH_SUCCESSFUL

        print(entry, flush=True)
        return paramiko.AUTH_FAILED

    def check_auth_none(self, username):
        if username == "queen":
            return paramiko.AUTH_SUCCESSFUL
        return paramiko.AUTH_FAILED

    def check_channel_shell_request(self, channel):
        return True

    def check_channel_exec_request(self, channel, command):
        self.exec_commands[channel.chanid] = command
        self.exec_events[channel.chanid].set()
        return True

    def check_channel_pty_request(self, channel, *args):
        return True

    def get_allowed_auths(self, username):
        if username == "alice":
            return "password"
        return ""

def run_channel(server, channel):
    logger.info(f"opened channel {channel.chanid}")
    server.exec_events[channel.chanid].wait()
    command = server.exec_commands[channel.chanid]
    logger.info(f"received command {command!r} on channel {channel.chanid}")

    if command == b"whoami":
        channel.send(b"alice\n")
        channel.send_exit_status(0)
    elif command == b"cat":
        data_len = 0
        while data := channel.recv(1024):
            logger.info(f"received {len(data)} bytes")
            while data:
                sent_len = channel.send(data)
                logger.info(f"sent {len(data)} bytes")
                data = data[sent_len:]
            data_len += len(data)
        logger.info(f"received eof after processing {data_len} bytes")
        if not channel.closed:
            channel.send_exit_status(0)
    elif command == b"true":
        channel.send_exit_status(0)
    elif command == b"false":
        channel.send_exit_status(1)
    else:
        channel.send_stderr(b"unknown command!\n")
        channel.send_exit_status(127)

    channel.close()
    logger.info(f"closed channel {channel.chanid}")

def run_client(server_keys, client_sock, client_addr):
    logger.info(f"received connection from {client_addr!r}")

    trans = paramiko.Transport(client_sock)
    try:
        for key in server_keys:
            trans.add_server_key(key)

        server = Server()
        trans.start_server(server=server)

        threads = []
        while (channel := trans.accept(None)) is not None:
            thread = threading.Thread(target=run_channel, args=(server, channel,))
            thread.start()
            threads.append(thread)
        for thread in threads:
            thread.join()
    finally:
        trans.close()
        client_sock.close()

def run_server():
    from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1, SECP384R1
    server_keys = [
        paramiko.rsakey.RSAKey.generate(1024),
        paramiko.ed25519key.Ed25519Key.from_private_key_file("host_key_ed25519"),
        paramiko.ecdsakey.ECDSAKey.generate(SECP256R1),
        paramiko.ecdsakey.ECDSAKey.generate(SECP384R1),
    ]

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("", 22))
    sock.listen(100)
    logger.info("listening on port 22")

    while True:
        client_sock, client_addr = sock.accept()
        thread = threading.Thread(target=run_client, args=(server_keys, client_sock, client_addr))
        thread.start()

if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    run_server()