metalssh 0.0.1

Experimental SSH implementation
#!/usr/bin/env python3
"""
Extract SSH packets from pcap file, one per .bin file, in chronological order.
Handles multiple SSH packets within a single TCP packet and MAC extraction.
"""

import subprocess
import struct
from pathlib import Path

# Common MAC algorithms and their lengths
MAC_LENGTHS = {
    'hmac-sha1': 20,
    'hmac-sha1-96': 12,
    'hmac-sha2-256': 32,
    'hmac-sha2-256-96': 12,
    'hmac-sha2-512': 64,
    'hmac-sha2-512-96': 12,
    'hmac-md5': 16,
    'hmac-md5-96': 12,
    'umac-64@openssh.com': 8,
    'umac-128@openssh.com': 16,
    'hmac-sha1-etm@openssh.com': 20,
    'hmac-sha2-256-etm@openssh.com': 32,
    'hmac-sha2-512-etm@openssh.com': 64,
    'umac-64-etm@openssh.com': 8,
    'umac-128-etm@openssh.com': 16,
}

def parse_string(data, offset):
    """Parse an SSH string (4-byte length + data)."""
    if len(data) < offset + 4:
        return None, offset
    length = struct.unpack('>I', data[offset:offset+4])[0]
    if len(data) < offset + 4 + length:
        return None, offset
    return data[offset+4:offset+4+length], offset + 4 + length

def parse_name_list(data, offset):
    """Parse an SSH name-list."""
    name_bytes, new_offset = parse_string(data, offset)
    if name_bytes is None:
        return [], offset
    names = name_bytes.decode('utf-8', errors='ignore').split(',')
    return names, new_offset

def parse_kexinit(data):
    """Parse KEXINIT message to extract MAC algorithms."""
    offset = 0

    # Skip packet length (4) + padding length (1) + message code (1)
    offset = 6

    # Skip cookie (16 bytes)
    offset += 16

    # Parse name-lists in order:
    # 1. kex_algorithms
    kex_algs, offset = parse_name_list(data, offset)

    # 2. server_host_key_algorithms
    _, offset = parse_name_list(data, offset)

    # 3. encryption_algorithms_client_to_server
    enc_c2s, offset = parse_name_list(data, offset)

    # 4. encryption_algorithms_server_to_client
    enc_s2c, offset = parse_name_list(data, offset)

    # 5. mac_algorithms_client_to_server
    mac_c2s, offset = parse_name_list(data, offset)

    # 6. mac_algorithms_server_to_client
    mac_s2c, offset = parse_name_list(data, offset)

    return {
        'mac_c2s': mac_c2s[0] if mac_c2s else None,
        'mac_s2c': mac_s2c[0] if mac_s2c else None,
        'enc_c2s': enc_c2s[0] if enc_c2s else None,
        'enc_s2c': enc_s2c[0] if enc_s2c else None,
    }

def extract_ssh_packets(pcap_path, output_dir):
    """Extract SSH packets from pcap and save individually."""

    # Extract per-frame TCP payloads
    cmd = [
        'tshark', '-r', pcap_path,
        '-Y', 'tcp.stream eq 0 and tcp.len > 0',
        '-T', 'fields',
        '-e', 'frame.number',
        '-e', 'tcp.srcport',
        '-e', 'tcp.dstport',
        '-e', 'tcp.payload',
        '-E', 'separator=|'
    ]

    result = subprocess.run(cmd, capture_output=True, text=True)

    packets = []
    client_mac_len = 0
    server_mac_len = 0
    newkeys_seen = {'client': False, 'server': False}

    # We need to determine the server and client ports first
    lines = result.stdout.strip().split('\n')
    if not lines:
        return packets

    # First line should be the client banner
    first_parts = lines[0].split('|')
    client_port = int(first_parts[1])
    server_port = int(first_parts[2])

    for line in lines:
        if not line:
            continue

        parts = line.split('|')
        if len(parts) < 4:
            continue

        frame_num = int(parts[0])
        src_port = int(parts[1])
        dst_port = int(parts[2])
        payload_hex = parts[3].replace(':', '')

        if not payload_hex:
            continue

        payload = bytes.fromhex(payload_hex)

        # Determine direction
        direction = 'client' if src_port == client_port else 'server'

        # Determine current MAC length for this direction
        current_mac_len = client_mac_len if direction == 'client' else server_mac_len
        has_newkeys = newkeys_seen[direction]

        # Check if this is a banner (starts with SSH-2.0)
        if payload.startswith(b'SSH-2.0'):
            # Banner packet
            banner_end = payload.find(b'\r\n')
            if banner_end != -1:
                banner = payload[:banner_end + 2]
                packets.append({
                    'frame': frame_num,
                    'direction': direction,
                    'data': banner,
                    'is_banner': True,
                    'packet_type': 'Banner'
                })

                # Check if there's more data after the banner
                remaining = payload[banner_end + 2:]
                if remaining:
                    # Parse SSH packets from remaining data
                    parsed, client_mac_len, server_mac_len, newkeys_seen = parse_ssh_packets(
                        remaining, frame_num, direction, current_mac_len,
                        client_mac_len, server_mac_len, newkeys_seen, has_newkeys
                    )
                    packets.extend(parsed)
        else:
            # Regular SSH packet(s)
            parsed, client_mac_len, server_mac_len, newkeys_seen = parse_ssh_packets(
                payload, frame_num, direction, current_mac_len,
                client_mac_len, server_mac_len, newkeys_seen, has_newkeys
            )
            packets.extend(parsed)

    return packets

def parse_ssh_packets(data, frame_num, direction, mac_len, client_mac_len, server_mac_len, newkeys_seen, has_newkeys):
    """Parse potentially multiple SSH packets from data, including MACs after NEWKEYS."""
    packets = []
    offset = 0
    # Track whether we've seen NEWKEYS in THIS parsing session
    newkeys_in_this_frame = has_newkeys

    while offset < len(data):
        if len(data) - offset < 5:  # Need at least packet length + padding length
            break

        # SSH packet format: 4 bytes length + payload (padding_length + message + padding) [+ MAC]
        packet_length = struct.unpack('>I', data[offset:offset+4])[0]

        # After NEWKEYS, packets include MAC
        mac_size = mac_len if newkeys_in_this_frame else 0

        # Total packet size = 4 (length field) + packet_length + mac_size
        total_size = 4 + packet_length + mac_size

        if offset + total_size > len(data):
            # Incomplete packet or wrong MAC size
            # This might be the first encrypted packet with unknown MAC size
            # Try to infer from remaining data
            if offset + 4 + packet_length <= len(data):
                # No MAC yet (before NEWKEYS)
                total_size = 4 + packet_length
                mac_size = 0
            else:
                break

        packet_data = data[offset:offset + total_size]

        # Try to determine packet type
        packet_type = 'SSH Packet'
        if len(packet_data) >= 6:
            padding_len = packet_data[4]
            msg_code = packet_data[5]

            if msg_code == 20:  # SSH_MSG_KEXINIT
                packet_type = 'KEXINIT'
                # Parse to get MAC algorithms
                try:
                    kex_info = parse_kexinit(packet_data)

                    # Determine which MAC length to use based on direction
                    if direction == 'client':
                        mac_name = kex_info['mac_c2s']
                        if mac_name and mac_name in MAC_LENGTHS:
                            client_mac_len = MAC_LENGTHS[mac_name]
                    else:
                        mac_name = kex_info['mac_s2c']
                        if mac_name and mac_name in MAC_LENGTHS:
                            server_mac_len = MAC_LENGTHS[mac_name]
                except:
                    pass

            elif msg_code == 21:  # SSH_MSG_NEWKEYS
                packet_type = 'NEWKEYS'
                # After NEWKEYS, subsequent packets in THIS parse session will have MAC
                newkeys_seen[direction] = True
                newkeys_in_this_frame = True
                # Update mac_len for subsequent packets in this same TCP frame
                if direction == 'client':
                    mac_len = client_mac_len
                else:
                    mac_len = server_mac_len

        packets.append({
            'frame': frame_num,
            'direction': direction,
            'data': packet_data,
            'is_banner': False,
            'packet_type': packet_type,
            'mac_len': mac_size
        })

        offset += total_size

    return packets, client_mac_len, server_mac_len, newkeys_seen

def main():
    pcap_path = '/Users/pierce/code/metalssh/metalssh/testdata/none-exec/none-exec.pcapng'
    output_dir = Path('/Users/pierce/code/metalssh/metalssh/testdata/none-exec')

    # First, clean up old .bin files
    for old_file in output_dir.glob('*.bin'):
        old_file.unlink()

    packets = extract_ssh_packets(pcap_path, output_dir)

    # Sort by frame number to maintain chronological order
    packets.sort(key=lambda p: p['frame'])

    # Separate banners from regular packets
    client_banner = None
    server_banner = None
    regular_packets = []

    for p in packets:
        if p.get('is_banner'):
            if p['direction'] == 'client':
                client_banner = p
            else:
                server_banner = p
        else:
            regular_packets.append(p)

    # Build final ordered list: banners first (in order they appeared), then regular packets
    final_packets = []

    if client_banner and server_banner:
        if client_banner['frame'] < server_banner['frame']:
            final_packets = [client_banner, server_banner] + regular_packets
        else:
            final_packets = [server_banner, client_banner] + regular_packets
    elif client_banner:
        final_packets = [client_banner] + regular_packets
    elif server_banner:
        final_packets = [server_banner] + regular_packets
    else:
        final_packets = regular_packets

    # Save packets and build table
    table_rows = []
    table_rows.append('| Packet # | File | Direction | Frame # | Size (bytes) | Type | MAC Length |')
    table_rows.append('|----------|------|-----------|---------|--------------|------|------------|')

    for idx, packet in enumerate(final_packets):
        direction = packet['direction']
        frame = packet['frame']
        data = packet['data']
        packet_type = packet.get('packet_type', 'SSH Packet')
        mac_len = packet.get('mac_len', 0)

        # Determine message type suffix for filename
        type_suffix = ''
        if packet.get('is_banner'):
            type_suffix = '-banner'
        elif packet_type == 'KEXINIT':
            type_suffix = '-kexinit'
        elif packet_type == 'NEWKEYS':
            type_suffix = '-newkeys'
        else:
            # Try to parse the message code for other packet types
            if len(data) >= 6:
                msg_code = data[5]
                msg_types = {
                    1: '-disconnect',
                    2: '-ignore',
                    3: '-unimplemented',
                    4: '-debug',
                    5: '-service_request',
                    6: '-service_accept',
                    7: '-ext_info',
                    20: '-kexinit',
                    21: '-newkeys',
                    30: '-kexdh_init',
                    31: '-kexdh_reply',
                    50: '-userauth_request',
                    51: '-userauth_failure',
                    52: '-userauth_success',
                    53: '-userauth_banner',
                    60: '-userauth_pk_ok',
                    80: '-global_request',
                    81: '-request_success',
                    82: '-request_failure',
                    90: '-channel_open',
                    91: '-channel_open_confirmation',
                    92: '-channel_open_failure',
                    93: '-channel_window_adjust',
                    94: '-channel_data',
                    95: '-channel_extended_data',
                    96: '-channel_eof',
                    97: '-channel_close',
                    98: '-channel_request',
                    99: '-channel_success',
                    100: '-channel_failure',
                }
                type_suffix = msg_types.get(msg_code, '')

        filename = f"{idx:02d}-{direction}{type_suffix}.bin"
        filepath = output_dir / filename

        # Write the packet
        with open(filepath, 'wb') as f:
            f.write(data)

        mac_info = f"{mac_len}" if mac_len > 0 else "0"
        table_rows.append(f"| {idx} | `{filename}` | {direction} | {frame} | {len(data)} | {packet_type} | {mac_info} |")

        print(f"Wrote {filename} ({len(data)} bytes, frame {frame}, type: {packet_type})")

    # Write the markdown table
    table_path = output_dir / 'none-exec-packets.md'
    with open(table_path, 'w') as f:
        f.write('# SSH Packets from none-exec.pcapng\n\n')
        f.write('Extracted SSH packets in chronological order.\n\n')
        f.write('**Note:** Each .bin file contains a complete SSH packet including:\n')
        f.write('- Packet length field (4 bytes)\n')
        f.write('- Padding length (1 byte)\n')
        f.write('- Payload (variable)\n')
        f.write('- Padding (variable)\n')
        f.write('- MAC (variable, only present after NEWKEYS)\n\n')
        f.write('\n'.join(table_rows))
        f.write('\n')

    print(f"\nWrote packet table to {table_path}")
    print(f"Total packets extracted: {len(final_packets)}")

if __name__ == '__main__':
    main()