import subprocess
import struct
from pathlib import Path
MAC_LENGTHS = {
'hmac-sha1': 20,
'hmac-sha1-96': 12,
'hmac-sha2-256': 32,
'hmac-sha2-256-96': 12,
'hmac-sha2-512': 64,
'hmac-sha2-512-96': 12,
'hmac-md5': 16,
'hmac-md5-96': 12,
'umac-64@openssh.com': 8,
'umac-128@openssh.com': 16,
'hmac-sha1-etm@openssh.com': 20,
'hmac-sha2-256-etm@openssh.com': 32,
'hmac-sha2-512-etm@openssh.com': 64,
'umac-64-etm@openssh.com': 8,
'umac-128-etm@openssh.com': 16,
}
def parse_string(data, offset):
if len(data) < offset + 4:
return None, offset
length = struct.unpack('>I', data[offset:offset+4])[0]
if len(data) < offset + 4 + length:
return None, offset
return data[offset+4:offset+4+length], offset + 4 + length
def parse_name_list(data, offset):
name_bytes, new_offset = parse_string(data, offset)
if name_bytes is None:
return [], offset
names = name_bytes.decode('utf-8', errors='ignore').split(',')
return names, new_offset
def parse_kexinit(data):
offset = 0
offset = 6
offset += 16
kex_algs, offset = parse_name_list(data, offset)
_, offset = parse_name_list(data, offset)
enc_c2s, offset = parse_name_list(data, offset)
enc_s2c, offset = parse_name_list(data, offset)
mac_c2s, offset = parse_name_list(data, offset)
mac_s2c, offset = parse_name_list(data, offset)
return {
'mac_c2s': mac_c2s[0] if mac_c2s else None,
'mac_s2c': mac_s2c[0] if mac_s2c else None,
'enc_c2s': enc_c2s[0] if enc_c2s else None,
'enc_s2c': enc_s2c[0] if enc_s2c else None,
}
def extract_ssh_packets(pcap_path, output_dir):
cmd = [
'tshark', '-r', pcap_path,
'-Y', 'tcp.stream eq 0 and tcp.len > 0',
'-T', 'fields',
'-e', 'frame.number',
'-e', 'tcp.srcport',
'-e', 'tcp.dstport',
'-e', 'tcp.payload',
'-E', 'separator=|'
]
result = subprocess.run(cmd, capture_output=True, text=True)
packets = []
client_mac_len = 0
server_mac_len = 0
newkeys_seen = {'client': False, 'server': False}
lines = result.stdout.strip().split('\n')
if not lines:
return packets
first_parts = lines[0].split('|')
client_port = int(first_parts[1])
server_port = int(first_parts[2])
for line in lines:
if not line:
continue
parts = line.split('|')
if len(parts) < 4:
continue
frame_num = int(parts[0])
src_port = int(parts[1])
dst_port = int(parts[2])
payload_hex = parts[3].replace(':', '')
if not payload_hex:
continue
payload = bytes.fromhex(payload_hex)
direction = 'client' if src_port == client_port else 'server'
current_mac_len = client_mac_len if direction == 'client' else server_mac_len
has_newkeys = newkeys_seen[direction]
if payload.startswith(b'SSH-2.0'):
banner_end = payload.find(b'\r\n')
if banner_end != -1:
banner = payload[:banner_end + 2]
packets.append({
'frame': frame_num,
'direction': direction,
'data': banner,
'is_banner': True,
'packet_type': 'Banner'
})
remaining = payload[banner_end + 2:]
if remaining:
parsed, client_mac_len, server_mac_len, newkeys_seen = parse_ssh_packets(
remaining, frame_num, direction, current_mac_len,
client_mac_len, server_mac_len, newkeys_seen, has_newkeys
)
packets.extend(parsed)
else:
parsed, client_mac_len, server_mac_len, newkeys_seen = parse_ssh_packets(
payload, frame_num, direction, current_mac_len,
client_mac_len, server_mac_len, newkeys_seen, has_newkeys
)
packets.extend(parsed)
return packets
def parse_ssh_packets(data, frame_num, direction, mac_len, client_mac_len, server_mac_len, newkeys_seen, has_newkeys):
packets = []
offset = 0
newkeys_in_this_frame = has_newkeys
while offset < len(data):
if len(data) - offset < 5: break
packet_length = struct.unpack('>I', data[offset:offset+4])[0]
mac_size = mac_len if newkeys_in_this_frame else 0
total_size = 4 + packet_length + mac_size
if offset + total_size > len(data):
if offset + 4 + packet_length <= len(data):
total_size = 4 + packet_length
mac_size = 0
else:
break
packet_data = data[offset:offset + total_size]
packet_type = 'SSH Packet'
if len(packet_data) >= 6:
padding_len = packet_data[4]
msg_code = packet_data[5]
if msg_code == 20: packet_type = 'KEXINIT'
try:
kex_info = parse_kexinit(packet_data)
if direction == 'client':
mac_name = kex_info['mac_c2s']
if mac_name and mac_name in MAC_LENGTHS:
client_mac_len = MAC_LENGTHS[mac_name]
else:
mac_name = kex_info['mac_s2c']
if mac_name and mac_name in MAC_LENGTHS:
server_mac_len = MAC_LENGTHS[mac_name]
except:
pass
elif msg_code == 21: packet_type = 'NEWKEYS'
newkeys_seen[direction] = True
newkeys_in_this_frame = True
if direction == 'client':
mac_len = client_mac_len
else:
mac_len = server_mac_len
packets.append({
'frame': frame_num,
'direction': direction,
'data': packet_data,
'is_banner': False,
'packet_type': packet_type,
'mac_len': mac_size
})
offset += total_size
return packets, client_mac_len, server_mac_len, newkeys_seen
def main():
pcap_path = '/Users/pierce/code/metalssh/metalssh/testdata/none-exec/none-exec.pcapng'
output_dir = Path('/Users/pierce/code/metalssh/metalssh/testdata/none-exec')
for old_file in output_dir.glob('*.bin'):
old_file.unlink()
packets = extract_ssh_packets(pcap_path, output_dir)
packets.sort(key=lambda p: p['frame'])
client_banner = None
server_banner = None
regular_packets = []
for p in packets:
if p.get('is_banner'):
if p['direction'] == 'client':
client_banner = p
else:
server_banner = p
else:
regular_packets.append(p)
final_packets = []
if client_banner and server_banner:
if client_banner['frame'] < server_banner['frame']:
final_packets = [client_banner, server_banner] + regular_packets
else:
final_packets = [server_banner, client_banner] + regular_packets
elif client_banner:
final_packets = [client_banner] + regular_packets
elif server_banner:
final_packets = [server_banner] + regular_packets
else:
final_packets = regular_packets
table_rows = []
table_rows.append('| Packet # | File | Direction | Frame # | Size (bytes) | Type | MAC Length |')
table_rows.append('|----------|------|-----------|---------|--------------|------|------------|')
for idx, packet in enumerate(final_packets):
direction = packet['direction']
frame = packet['frame']
data = packet['data']
packet_type = packet.get('packet_type', 'SSH Packet')
mac_len = packet.get('mac_len', 0)
type_suffix = ''
if packet.get('is_banner'):
type_suffix = '-banner'
elif packet_type == 'KEXINIT':
type_suffix = '-kexinit'
elif packet_type == 'NEWKEYS':
type_suffix = '-newkeys'
else:
if len(data) >= 6:
msg_code = data[5]
msg_types = {
1: '-disconnect',
2: '-ignore',
3: '-unimplemented',
4: '-debug',
5: '-service_request',
6: '-service_accept',
7: '-ext_info',
20: '-kexinit',
21: '-newkeys',
30: '-kexdh_init',
31: '-kexdh_reply',
50: '-userauth_request',
51: '-userauth_failure',
52: '-userauth_success',
53: '-userauth_banner',
60: '-userauth_pk_ok',
80: '-global_request',
81: '-request_success',
82: '-request_failure',
90: '-channel_open',
91: '-channel_open_confirmation',
92: '-channel_open_failure',
93: '-channel_window_adjust',
94: '-channel_data',
95: '-channel_extended_data',
96: '-channel_eof',
97: '-channel_close',
98: '-channel_request',
99: '-channel_success',
100: '-channel_failure',
}
type_suffix = msg_types.get(msg_code, '')
filename = f"{idx:02d}-{direction}{type_suffix}.bin"
filepath = output_dir / filename
with open(filepath, 'wb') as f:
f.write(data)
mac_info = f"{mac_len}" if mac_len > 0 else "0"
table_rows.append(f"| {idx} | `{filename}` | {direction} | {frame} | {len(data)} | {packet_type} | {mac_info} |")
print(f"Wrote {filename} ({len(data)} bytes, frame {frame}, type: {packet_type})")
table_path = output_dir / 'none-exec-packets.md'
with open(table_path, 'w') as f:
f.write('# SSH Packets from none-exec.pcapng\n\n')
f.write('Extracted SSH packets in chronological order.\n\n')
f.write('**Note:** Each .bin file contains a complete SSH packet including:\n')
f.write('- Packet length field (4 bytes)\n')
f.write('- Padding length (1 byte)\n')
f.write('- Payload (variable)\n')
f.write('- Padding (variable)\n')
f.write('- MAC (variable, only present after NEWKEYS)\n\n')
f.write('\n'.join(table_rows))
f.write('\n')
print(f"\nWrote packet table to {table_path}")
print(f"Total packets extracted: {len(final_packets)}")
if __name__ == '__main__':
main()