import gdb
import random
seed = 156112673742
threads_whitelist = {2, 3}
filename = "target/debug/binary"
entrypoint = "src/main.rs:8"
exitpoint = "src/main.rs:12"
unreachable = [
"panic_unwind::imp::panic"
]
interesting = [
"src/main.rs:8",
"src/main.rs:9"
]
gdb.execute("set trace-commands on")
class UnreachableBreakpoint(gdb.Breakpoint):
pass
class DoneBreakpoint(gdb.Breakpoint):
pass
class InterestingBreakpoint(gdb.Breakpoint):
pass
class DeterministicExecutor:
def __init__(self, seed=None):
if seed:
print("seeding with", seed)
self.seed = seed
random.seed(seed)
else:
self.reseed()
gdb.execute("file " + filename)
gdb.execute("set non-stop on")
gdb.execute("set confirm off")
self.ready = set()
self.finished = set()
def reseed(self):
random.seed()
self.seed = random.randrange(1e12)
print("reseeding with", self.seed)
random.seed(self.seed)
def restart(self):
self.ready = set()
self.finished = set()
gdb.events.stop.disconnect(self.scheduler_callback)
gdb.events.exited.disconnect(self.exit_callback)
gdb.execute("d")
gdb.execute("k")
self.reseed()
self.run()
def rendezvous_callback(self, event):
try:
self.ready.add(event.inferior_thread.num)
if len(self.ready) == len(threads_whitelist):
self.run_schedule()
except Exception as e:
print(e)
def run(self):
gdb.execute("b " + entrypoint)
gdb.events.stop.connect(self.rendezvous_callback)
gdb.events.exited.connect(self.exit_callback)
gdb.execute("r")
def run_schedule(self):
print("running schedule")
gdb.execute("d")
gdb.events.stop.disconnect(self.rendezvous_callback)
gdb.events.stop.connect(self.scheduler_callback)
for bp in interesting:
InterestingBreakpoint(bp)
for bp in unreachable:
UnreachableBreakpoint(bp)
DoneBreakpoint(exitpoint)
self.pick()
def pick(self):
threads = self.runnable_threads()
if not threads:
print("restarting execution after running out of valid threads")
self.restart()
return
thread = random.choice(threads)
gdb.execute("t " + str(thread.num))
gdb.execute("c")
def scheduler_callback(self, event):
if not isinstance(event, gdb.BreakpointEvent):
print("WTF sched callback got", event.__dict__)
return
if isinstance(event.breakpoint, DoneBreakpoint):
self.finished.add(event.inferior_thread.num)
elif isinstance(event.breakpoint, UnreachableBreakpoint):
print("!" * 80)
print("unreachable breakpoint triggered with seed", self.seed)
print("!" * 80)
gdb.events.exited.disconnect(self.exit_callback)
gdb.execute("q")
else:
print("thread", event.inferior_thread.num,
"hit breakpoint at", event.breakpoint.location)
self.pick()
def runnable_threads(self):
threads = gdb.selected_inferior().threads()
def f(it):
return (it.is_valid() and not
it.is_exited() and
it.num in threads_whitelist and
it.num not in self.finished)
good_threads = [it for it in threads if f(it)]
good_threads.sort(key=lambda it: it.num)
return good_threads
def exit_callback(self, event):
try:
if event.exit_code != 0:
print("!" * 80)
print("interesting exit with seed", self.seed)
print("!" * 80)
else:
print("happy exit")
self.restart()
gdb.execute("q")
except Exception as e:
pass
de = DeterministicExecutor(seed)
de.run()