from __future__ import print_function
import time
__author__ = 'Daniel Lindsley'
__license__ = 'BSD'
class Strength(object):
REQUIRED = None
STRONG_PREFERRED = None
PREFERRED = None
STRONG_DEFAULT = None
NORMAL = None
WEAK_DEFAULT = None
WEAKEST = None
def __init__(self, strength, name):
super(Strength, self).__init__()
self.strength = strength
self.name = name
@classmethod
def stronger(cls, s1, s2):
return s1.strength < s2.strength
@classmethod
def weaker(cls, s1, s2):
return s1.strength > s2.strength
@classmethod
def weakest_of(cls, s1, s2):
if cls.weaker(s1, s2):
return s1
return s2
@classmethod
def strongest(cls, s1, s2):
if cls.stronger(s1, s2):
return s1
return s2
def next_weaker(self):
strengths = {
0: self.__class__.WEAKEST,
1: self.__class__.WEAK_DEFAULT,
2: self.__class__.NORMAL,
3: self.__class__.STRONG_DEFAULT,
4: self.__class__.PREFERRED,
5: self.__class__.REQUIRED,
}
return strengths[self.strength]
Strength.REQUIRED = Strength(0, "required")
Strength.STONG_PREFERRED = Strength(1, "strongPreferred")
Strength.PREFERRED = Strength(2, "preferred")
Strength.STRONG_DEFAULT = Strength(3, "strongDefault")
Strength.NORMAL = Strength(4, "normal")
Strength.WEAK_DEFAULT = Strength(5, "weakDefault")
Strength.WEAKEST = Strength(6, "weakest")
class Constraint(object):
def __init__(self, strength):
super(Constraint, self).__init__()
self.strength = strength
def add_constraint(self):
global planner
self.add_to_graph()
planner.incremental_add(self)
def satisfy(self, mark):
global planner
self.choose_method(mark)
if not self.is_satisfied():
if self.strength == Strength.REQUIRED:
print('Could not satisfy a required constraint!')
return None
self.mark_inputs(mark)
out = self.output()
overridden = out.determined_by
if overridden is not None:
overridden.mark_unsatisfied()
out.determined_by = self
if not planner.add_propagate(self, mark):
print('Cycle encountered')
out.mark = mark
return overridden
def destroy_constraint(self):
global planner
if self.is_satisfied():
planner.incremental_remove(self)
else:
self.remove_from_graph()
def is_input(self):
return False
class UrnaryConstraint(Constraint):
def __init__(self, v, strength):
super(UrnaryConstraint, self).__init__(strength)
self.my_output = v
self.satisfied = False
self.add_constraint()
def add_to_graph(self):
self.my_output.add_constraint(self)
self.satisfied = False
def choose_method(self, mark):
if self.my_output.mark != mark and \
Strength.stronger(self.strength, self.my_output.walk_strength):
self.satisfied = True
else:
self.satisfied = False
def is_satisfied(self):
return self.satisfied
def mark_inputs(self, mark):
pass
def output(self):
return self.my_output
def recalculate(self):
self.my_output.walk_strength = self.strength
self.my_output.stay = not self.is_input()
if self.my_output.stay:
self.execute()
def mark_unsatisfied(self):
self.satisfied = False
def inputs_known(self, mark):
return True
def remove_from_graph(self):
if self.my_output is not None:
self.my_output.remove_constraint(self)
self.satisfied = False
class StayConstraint(UrnaryConstraint):
def __init__(self, v, string):
super(StayConstraint, self).__init__(v, string)
def execute(self):
pass
class EditConstraint(UrnaryConstraint):
def __init__(self, v, string):
super(EditConstraint, self).__init__(v, string)
def is_input(self):
return True
def execute(self):
pass
class Direction(object):
NONE = 0
FORWARD = 1
BACKWARD = -1
class BinaryConstraint(Constraint):
def __init__(self, v1, v2, strength):
super(BinaryConstraint, self).__init__(strength)
self.v1 = v1
self.v2 = v2
self.direction = Direction.NONE
self.add_constraint()
def choose_method(self, mark):
if self.v1.mark == mark:
if self.v2.mark != mark and Strength.stronger(self.strength, self.v2.walk_strength):
self.direction = Direction.FORWARD
else:
self.direction = Direction.BACKWARD
if self.v2.mark == mark:
if self.v1.mark != mark and Strength.stronger(self.strength, self.v1.walk_strength):
self.direction = Direction.BACKWARD
else:
self.direction = Direction.NONE
if Strength.weaker(self.v1.walk_strength, self.v2.walk_strength):
if Strength.stronger(self.strength, self.v1.walk_strength):
self.direction = Direction.BACKWARD
else:
self.direction = Direction.NONE
else:
if Strength.stronger(self.strength, self.v2.walk_strength):
self.direction = Direction.FORWARD
else:
self.direction = Direction.BACKWARD
def add_to_graph(self):
self.v1.add_constraint(self)
self.v2.add_constraint(self)
self.direction = Direction.NONE
def is_satisfied(self):
return self.direction != Direction.NONE
def mark_inputs(self, mark):
self.input().mark = mark
def input(self):
if self.direction == Direction.FORWARD:
return self.v1
return self.v2
def output(self):
if self.direction == Direction.FORWARD:
return self.v2
return self.v1
def recalculate(self):
ihn = self.input()
out = self.output()
out.walk_strength = Strength.weakest_of(self.strength, ihn.walk_strength)
out.stay = ihn.stay
if out.stay:
self.execute()
def mark_unsatisfied(self):
self.direction = Direction.NONE
def inputs_known(self, mark):
i = self.input()
return i.mark == mark or i.stay or i.determined_by == None
def remove_from_graph(self):
if self.v1 is not None:
self.v1.remove_constraint(self)
if self.v2 is not None:
self.v2.remove_constraint(self)
self.direction = Direction.NONE
class ScaleConstraint(BinaryConstraint):
def __init__(self, src, scale, offset, dest, strength):
self.direction = Direction.NONE
self.scale = scale
self.offset = offset
super(ScaleConstraint, self).__init__(src, dest, strength)
def add_to_graph(self):
super(ScaleConstraint, self).add_to_graph()
self.scale.add_constraint(self)
self.offset.add_constraint(self)
def remove_from_graph(self):
super(ScaleConstraint, self).remove_from_graph()
if self.scale is not None:
self.scale.remove_constraint(self)
if self.offset is not None:
self.offset.remove_constraint(self)
def mark_inputs(self, mark):
super(ScaleConstraint, self).mark_inputs(mark)
self.scale.mark = mark
self.offset.mark = mark
def execute(self):
if self.direction == Direction.FORWARD:
self.v2.value = self.v1.value * self.scale.value + self.offset.value
else:
self.v1.value = (self.v2.value - self.offset.value) / self.scale.value
def recalculate(self):
ihn = self.input()
out = self.output()
out.walk_strength = Strength.weakest_of(self.strength, ihn.walk_strength)
out.stay = ihn.stay and self.scale.stay and self.offset.stay
if out.stay:
self.execute()
class EqualityConstraint(BinaryConstraint):
def execute(self):
self.output().value = self.input().value
class Variable(object):
def __init__(self, name, initial_value=0):
super(Variable, self).__init__()
self.name = name
self.value = initial_value
self.constraints = []
self.determined_by = None
self.mark = 0
self.walk_strength = Strength.WEAKEST
self.stay = True
def __repr__(self):
return '<Variable: %s - %s>' % (
self.name,
self.value
)
def add_constraint(self, constraint):
self.constraints.append(constraint)
def remove_constraint(self, constraint):
self.constraints.remove(constraint)
if self.determined_by == constraint:
self.determined_by = None
class Planner(object):
def __init__(self):
super(Planner, self).__init__()
self.current_mark = 0
def incremental_add(self, constraint):
mark = self.new_mark()
overridden = constraint.satisfy(mark)
while overridden is not None:
overridden = overridden.satisfy(mark)
def incremental_remove(self, constraint):
out = constraint.output()
constraint.mark_unsatisfied()
constraint.remove_from_graph()
unsatisfied = self.remove_propagate_from(out)
strength = Strength.REQUIRED
repeat = True
while repeat:
for u in unsatisfied:
if u.strength == strength:
self.incremental_add(u)
strength = strength.next_weaker()
repeat = strength != Strength.WEAKEST
def new_mark(self):
self.current_mark += 1
return self.current_mark
def make_plan(self, sources):
mark = self.new_mark()
plan = Plan()
todo = sources
while len(todo):
c = todo.pop(0)
if c.output().mark != mark and c.inputs_known(mark):
plan.add_constraint(c)
c.output().mark = mark
self.add_constraints_consuming_to(c.output(), todo)
return plan
def extract_plan_from_constraints(self, constraints):
sources = []
for c in constraints:
if c.is_input() and c.is_satisfied():
sources.append(c)
return self.make_plan(sources)
def add_propagate(self, c, mark):
todo = []
todo.append(c)
while len(todo):
d = todo.pop(0)
if d.output().mark == mark:
self.incremental_remove(c)
return False
d.recalculate()
self.add_constraints_consuming_to(d.output(), todo)
return True
def remove_propagate_from(self, out):
out.determined_by = None
out.walk_strength = Strength.WEAKEST
out.stay = True
unsatisfied = []
todo = []
todo.append(out)
while len(todo):
v = todo.pop(0)
for c in v.constraints:
if not c.is_satisfied():
unsatisfied.append(c)
determining = v.determined_by
for c in v.constraints:
if c != determining and c.is_satisfied():
c.recalculate()
todo.append(c.output())
return unsatisfied
def add_constraints_consuming_to(self, v, coll):
determining = v.determined_by
cc = v.constraints
for c in cc:
if c != determining and c.is_satisfied():
coll.append(c)
class Plan(object):
def __init__(self):
super(Plan, self).__init__()
self.v = []
def add_constraint(self, c):
self.v.append(c)
def __len__(self):
return len(self.v)
def __getitem__(self, index):
return self.v[index]
def execute(self):
for c in self.v:
c.execute()
total = 0
def chain_test(n):
global planner
global total
planner = Planner()
prev, first, last = None, None, None
for i in range(n + 1):
name = "v%s" % i
v = Variable(name)
if prev is not None:
EqualityConstraint(prev, v, Strength.REQUIRED)
if i == 0:
first = v
if i == n:
last = v
prev = v
StayConstraint(last, Strength.STRONG_DEFAULT)
edit = EditConstraint(first, Strength.PREFERRED)
edits = []
edits.append(edit)
plan = planner.extract_plan_from_constraints(edits)
for i in range(100):
first.value = i
plan.execute()
total += int(last.value)
if last.value != i:
print("Chain test failed.")
def projection_test(n):
global planner
global total
planner = Planner()
scale = Variable("scale", 10)
offset = Variable("offset", 1000)
src, dest = None, None
dests = []
for i in range(n):
src = Variable("src%s" % i, i)
dst = Variable("dst%s" % i, i)
dests.append(dst)
StayConstraint(src, Strength.NORMAL)
ScaleConstraint(src, scale, offset, dst, Strength.REQUIRED)
change(src, 17)
total += int(dst.value)
if dst.value != 1170:
print("Projection 1 failed")
change(dst, 1050)
total += int(src.value)
if src.value != 5:
print("Projection 2 failed")
change(scale, 5)
for i in range(n - 1):
total += int(dests[i].value)
if dests[i].value != (i * 5 + 1000):
print("Projection 3 failed")
change(offset, 2000)
for i in range(n - 1):
total += int(dests[i].value)
if dests[i].value != (i * 5 + 2000):
print("Projection 4 failed")
def change(v, new_value):
global planner
edit = EditConstraint(v, Strength.PREFERRED)
edits = []
edits.append(edit)
plan = planner.extract_plan_from_constraints(edits)
for i in range(10):
v.value = new_value
plan.execute()
edit.destroy_constraint()
planner = None
def delta_blue():
global total
start = time.clock()
for i in range(40):
chain_test(100)
projection_test(100)
print(total)
print("elapsed: " + str(time.clock() - start))
if __name__ == '__main__':
delta_blue()