from xml.dom.minidom import parseString
import struct
import time
def hexify(s):
return s.hex().encode()
def unhexify(s):
return bytes.fromhex(s.decode())
class FakeSocket:
def __init__(self, file):
self.file = file
def send(self, data):
self.file.write(data)
def recv(self, bufsize):
return self.file.read(bufsize)
class Target:
def __init__(self, sock):
if "send" in dir(sock):
self.sock = sock
else:
self.sock = FakeSocket(sock)
self.PacketSize=0x100
def getpacket(self):
while True:
while self.sock.recv(1) != b'$':
pass
csum = 0
packet = []
while True:
c, = self.sock.recv(1)
if c == ord('#'):
break
if c == ord('$'):
packet = []
csum = 0
continue
if c == ord('}'):
c, = self.sock.recv(1)
csum += c + ord('}')
packet.append(c ^ 0x20)
continue
packet.append(c)
csum += c
if (csum & 0xFF) == int(self.sock.recv(2),16):
break
self.sock.send(b'-')
self.sock.send(b'+')
return bytes(packet)
def putpacket(self, packet):
if type(packet) == str:
packet = packet.encode()
while True:
out = []
for c in packet:
if (c in b'$#}'):
out.append(ord('}'))
out.append(c ^ 0x20)
else:
out.append(c)
csum = sum(out)
outb = b'$'+bytes(out)+b'#%02X' % (csum & 0xff)
self.sock.send(outb)
if self.sock.recv(1) == b'+':
break
def monitor(self, cmd):
if type(cmd) == str:
cmd = cmd.encode()
ret = []
self.putpacket(b"qRcmd," + hexify(cmd))
while True:
s = self.getpacket()
if s == b'':
return None
if s == b'OK':
return ret
if s.startswith(b'O'):
ret.append(unhexify(s[1:]))
else:
raise Exception('Invalid GDB stub response %r'%s.decode())
def attach(self, pid):
self.putpacket(b"vAttach;%08X" % pid)
reply = self.getpacket()
if (reply == b'') or (reply[:1] == b'E'):
raise Exception('Failed to attach to remote pid %d' % pid)
def detach(self):
self.putpacket(b"D")
if self.getpacket() != b'OK':
raise Exception("Failed to detach from remote process")
def reset(self):
self.putpacket(b"r")
def read_mem(self, addr, length):
ret = b''
while length:
packlen = min(length,self.PacketSize//2)
self.putpacket(b"m%08X,%08X" % (addr, packlen))
reply = self.getpacket()
if (reply == b'') or (reply[:1] == b'E'):
raise Exception('Error reading memory at 0x%08X' % addr)
try:
data = unhexify(reply)
except Exception:
raise Exception('Invalid response to memory read packet: %r' % reply)
ret += data
length -= packlen
addr += packlen
return ret
def write_mem(self, addr, data):
data = bytes(data)
while data:
d = data[:self.PacketSize-44]
data = data[len(d):]
pack = b"X%08X,%08X:%s" % (addr, len(d), d)
self.putpacket(pack)
if self.getpacket() != b'OK':
raise Exception('Error writing to memory at 0x%08X' % addr)
addr += len(d)
def read_regs(self):
self.putpacket(b"g")
reply = self.getpacket()
if (reply == b'') or (reply[:1] == b'E'):
raise Exception('Error reading target core registers')
try:
data = unhexify(reply)
except Exception:
raise Exception('Invalid response to registers read packet: %r' % reply)
ret = array.array('I',data)
return ret
def write_regs(self, *regs):
data = struct.pack("=%dL" % len(regs), *regs)
self.putpacket(b"G" + hexify(data))
if self.getpacket() != b'OK':
raise Exception('Error writing to target core registers')
def memmap_read(self):
offset = 0
ret = b''
while True:
self.putpacket(b"qXfer:memory-map:read::%08X,%08X" % (offset, 512))
reply = self.getpacket()
if (reply[0] in b'ml'):
offset += len(reply) - 1
ret += reply[1:]
else:
raise Exception('Invalid GDB stub response %r'%reply)
if reply[:1] == b'l':
return ret
def resume(self):
self.putpacket(b'c')
def interrupt(self):
self.sock.send(b'\x03')
def run_stub(self, stub, address, *args):
self.reset() time.sleep(0.1)
self.write_mem(address, stub)
regs = list(self.read_regs())
regs[:len(args)] = args
regs[15] = address
self.write_regs(*regs)
self.resume()
reply = None
while not reply:
reply = self.getpacket()
if not reply.startswith(b"T05"):
message = "Invalid stop response: %r" % reply
try:
message += {'T02':' (SIGINT)',
'T05':' (SIGTRAP)',
'T0B':' (SIGSEGV)',
'T1D':' (SIGLOST)'}[reply]
except KeyError:
pass
raise Exception(message)
class FlashMemory:
def __init__(self, target, offset, length, blocksize):
self.target = target
self.offset = offset
self.length = length
self.blocksize = blocksize
self.blocks = list(None for i in range(length // blocksize))
def prog(self, offset, data):
assert type(data)==bytes
assert ((offset >= self.offset) and
(offset + len(data) <= self.offset + self.length))
while data:
index = (offset - self.offset) // self.blocksize
bloffset = (offset - self.offset) % self.blocksize
bldata = data[:self.blocksize-bloffset]
data = data[len(bldata):]; offset += len(bldata)
if self.blocks[index] is None: self.blocks[index] = bytes(0xff for i in range(self.blocksize))
self.blocks[index] = (self.blocks[index][:bloffset] + bldata +
self.blocks[index][bloffset+len(bldata):])
def commit(self, progress_cb=None):
totalblocks = 0
for b in self.blocks:
if b is not None:
totalblocks += 1
block = 0
for i in range(len(self.blocks)):
block += 1
if callable(progress_cb) and totalblocks > 0:
progress_cb(block*100/totalblocks)
data = self.blocks[i]
addr = self.offset + self.blocksize * i
if data is None:
continue
self.target.putpacket(b"vFlashErase:%08X,%08X" %
(self.offset + self.blocksize*i, self.blocksize))
if self.target.getpacket() != b'OK':
raise Exception("Failed to erase flash")
while data:
d = data[0:980]
data = data[len(d):]
self.target.putpacket(b"vFlashWrite:%08X:%s" % (addr, d))
addr += len(d)
if self.target.getpacket() != b'OK':
raise Exception("Failed to write flash")
self.target.putpacket(b"vFlashDone")
if self.target.getpacket() != b'OK':
raise Exception("Failed to commit")
self.blocks = list(None for i in range(self.length // self.blocksize))
def flash_probe(self):
self.mem = []
xmldom = parseString(self.memmap_read())
for memrange in xmldom.getElementsByTagName("memory"):
if memrange.getAttribute("type") != "flash":
continue
offset = eval(memrange.getAttribute("start"))
length = eval(memrange.getAttribute("length"))
for property in memrange.getElementsByTagName("property"):
if property.getAttribute("name") == "blocksize":
blocksize = eval(property.firstChild.data)
break
mem = Target.FlashMemory(self, offset, length, blocksize)
self.mem.append(mem)
xmldom.unlink()
return self.mem
def flash_write_prepare(self, address, data):
for m in self.mem:
if (address >= m.offset) and (address + len(data) <= m.offset + m.length):
m.prog(address, data)
def flash_commit(self, progress_cb=None):
for m in self.mem:
m.commit(progress_cb)