import random
import unittest
GF2_MODULI = [
None, None,
2**2 + 2**1 + 1,
2**3 + 2**1 + 1,
2**4 + 2**1 + 1,
2**5 + 2**2 + 1,
2**6 + 2**1 + 1,
2**7 + 2**1 + 1,
2**8 + 2**4 + 2**3 + 2**1 + 1,
2**9 + 2**1 + 1,
2**10 + 2**3 + 1,
2**11 + 2**2 + 1,
2**12 + 2**3 + 1,
2**13 + 2**4 + 2**3 + 2**1 + 1,
2**14 + 2**5 + 1,
2**15 + 2**1 + 1,
2**16 + 2**5 + 2**3 + 2**1 + 1,
2**17 + 2**3 + 1,
2**18 + 2**3 + 1,
2**19 + 2**5 + 2**2 + 2**1 + 1,
2**20 + 2**3 + 1,
2**21 + 2**2 + 1,
2**22 + 2**1 + 1,
2**23 + 2**5 + 1,
2**24 + 2**4 + 2**3 + 2**1 + 1,
2**25 + 2**3 + 1,
2**26 + 2**4 + 2**3 + 2**1 + 1,
2**27 + 2**5 + 2**2 + 2**1 + 1,
2**28 + 2**1 + 1,
2**29 + 2**2 + 1,
2**30 + 2**1 + 1,
2**31 + 2**3 + 1,
2**32 + 2**7 + 2**3 + 2**2 + 1,
2**33 + 2**10 + 1,
2**34 + 2**7 + 1,
2**35 + 2**2 + 1,
2**36 + 2**9 + 1,
2**37 + 2**6 + 2**4 + 2**1 + 1,
2**38 + 2**6 + 2**5 + 2**1 + 1,
2**39 + 2**4 + 1,
2**40 + 2**5 + 2**4 + 2**3 + 1,
2**41 + 2**3 + 1,
2**42 + 2**7 + 1,
2**43 + 2**6 + 2**4 + 2**3 + 1,
2**44 + 2**5 + 1,
2**45 + 2**4 + 2**3 + 2**1 + 1,
2**46 + 2**1 + 1,
2**47 + 2**5 + 1,
2**48 + 2**5 + 2**3 + 2**2 + 1,
2**49 + 2**9 + 1,
2**50 + 2**4 + 2**3 + 2**2 + 1,
2**51 + 2**6 + 2**3 + 2**1 + 1,
2**52 + 2**3 + 1,
2**53 + 2**6 + 2**2 + 2**1 + 1,
2**54 + 2**9 + 1,
2**55 + 2**7 + 1,
2**56 + 2**7 + 2**4 + 2**2 + 1,
2**57 + 2**4 + 1,
2**58 + 2**19 + 1,
2**59 + 2**7 + 2**4 + 2**2 + 1,
2**60 + 2**1 + 1,
2**61 + 2**5 + 2**2 + 2**1 + 1,
2**62 + 2**29 + 1,
2**63 + 2**1 + 1,
2**64 + 2**4 + 2**3 + 2**1 + 1
]
class GF2Ops:
def __init__(self, field_size):
self.field_size = field_size
self._modulus = GF2_MODULI[field_size]
assert self._modulus is not None
def mul2(self, x):
x <<= 1
if x >> self.field_size:
x ^= self._modulus
return x
def mul(self, x, y):
ret = 0
while y:
if y & 1:
ret ^= x
y >>= 1
x = self.mul2(x)
return ret
def sqr(self, x):
return self.mul(x, x)
def inv(self, x):
assert x != 0
t1, t2 = 0, 1
r1, r2 = self._modulus, x
r1l, r2l = self.field_size + 1, r2.bit_length()
while r2:
q = r1l - r2l
r1 ^= r2 << q
t1 ^= t2 << q
r1l = r1.bit_length()
if r1 < r2:
t1, t2 = t2, t1
r1, r2 = r2, r1
r1l, r2l = r2l, r1l
assert r1 == 1
return t1
class TestGF2Ops(unittest.TestCase):
def field_size_test(self, field_size):
gf = GF2Ops(field_size)
for i in range(100):
x = random.randrange(1 << field_size)
y = random.randrange(1 << field_size)
x2 = gf.mul2(x)
xy = gf.mul(x, y)
self.assertEqual(x2, gf.mul(x, 2)) self.assertEqual(x2, gf.mul(2, x)) self.assertEqual(xy == 0, x == 0 or y == 0)
self.assertEqual(xy == x, y == 1 or x == 0)
self.assertEqual(xy == y, x == 1 or y == 0)
self.assertEqual(xy, gf.mul(y, x)) if i < 10:
xp = x
for _ in range(field_size):
xp = gf.sqr(xp)
self.assertEqual(xp, x) if y != 0:
yi = gf.inv(y)
self.assertEqual(y == yi, y == 1) self.assertEqual(gf.mul(y, yi), 1) yii = gf.inv(yi)
self.assertEqual(y, yii) if x != 0:
xi = gf.inv(x)
xyi = gf.inv(xy)
self.assertEqual(xyi, gf.mul(xi, yi))
def test(self):
for field_size in range(2, 65):
self.field_size_test(field_size)
def poly_monic(poly, gf):
inv = gf.inv(poly[-1])
return [gf.mul(inv, v) for v in poly]
def poly_divmod(poly, mod, gf):
assert len(mod) > 0 and mod[-1] == 1 if len(poly) < len(mod):
return ([], poly)
val = list(poly)
div = [0 for _ in range(len(val) - len(mod) + 1)]
while len(val) >= len(mod):
term = val[-1]
div[len(val) - len(mod)] = term
val.pop()
if term != 0:
for x in range(len(mod) - 1):
val[1 + x - len(mod)] ^= gf.mul(term, mod[x])
while len(val) > 0 and val[-1] == 0:
val.pop()
return div, val
def poly_gcd(a, b, gf):
if len(a) < len(b):
a, b = b, a
while len(b) > 0:
b = poly_monic(b, gf)
(_, b), a = poly_divmod(a, b, gf), b
return a
def poly_sqr(poly, gf):
if len(poly) == 0:
return []
return [0 if i & 1 else gf.sqr(poly[i // 2]) for i in range(2 * len(poly) - 1)]
def poly_tracemod(poly, param, gf):
out = [0, param]
for _ in range(gf.field_size - 1):
out = poly_sqr(out, gf)
while len(out) < 2:
out.append(0)
out[1] = param
_, out = poly_divmod(out, poly, gf)
return out
def poly_frobeniusmod(poly, gf):
out = [0, 1]
for _ in range(gf.field_size):
_, out = poly_divmod(poly_sqr(out, gf), poly, gf)
return out
def poly_find_roots(poly, gf):
assert len(poly) > 0
if len(poly) == 1:
return []
poly = poly_monic(poly, gf)
if len(poly) == 2:
return [poly[0]]
if poly_frobeniusmod(poly, gf) != [0, 1]:
return []
def rec_split(poly, randv):
assert len(poly) > 1 and poly[-1] == 1 if len(poly) == 2:
return [poly[0]]
while True:
trace = poly_tracemod(poly, randv, gf)
randv = gf.mul2(randv)
gcd = poly_gcd(trace, poly, gf)
if len(gcd) != len(poly) and len(gcd) > 1:
break
factor1 = poly_monic(gcd, gf)
factor2, _ = poly_divmod(poly, gcd, gf)
return rec_split(factor1, randv) + rec_split(factor2, randv)
return sorted(rec_split(poly, random.randrange(1, 1 << gf.field_size)))
class TestPolyFindRoots(unittest.TestCase):
def field_size_test(self, field_size):
gf = GF2Ops(field_size)
for test_size in [0, 1, 2, 3, 10]:
roots = [random.randrange(1 << field_size) for _ in range(test_size)]
roots_set = set(roots)
poly = [1]
for root in roots:
new_poly = [0] + poly
for n, c in enumerate(poly):
new_poly[n] ^= gf.mul(c, root)
poly = new_poly
found_roots = poly_find_roots(poly, gf)
if len(roots) == len(roots_set):
self.assertEqual(found_roots, sorted(roots))
else:
self.assertEqual(found_roots, [])
def test(self):
for field_size in range(2, 65):
self.field_size_test(field_size)
def berlekamp_massey(syndromes, gf):
current = [1]
prev = [1]
b_inv = 1
for n, discrepancy in enumerate(syndromes):
for i in range(1, len(current)):
discrepancy ^= gf.mul(syndromes[n - i], current[i])
if discrepancy:
x = n + 1 - (len(current) - 1) - (len(prev) - 1)
if 2 * (len(current) - 1) <= n:
tmp = list(current)
current.extend(0 for _ in range(len(prev) + x - len(current)))
mul = gf.mul(discrepancy, b_inv)
for i, v in enumerate(prev):
current[i + x] ^= gf.mul(mul, v)
prev = tmp
b_inv = gf.inv(discrepancy)
else:
mul = gf.mul(discrepancy, b_inv)
for i, v in enumerate(prev):
current[i + x] ^= gf.mul(mul, v)
return current
class Minisketch:
def __init__(self, field_size, capacity):
self.field_size = field_size
self.capacity = capacity
self.odd_syndromes = [0] * capacity
self.gf = GF2Ops(field_size)
def add(self, element):
sqr = self.gf.sqr(element)
for pos in range(self.capacity):
self.odd_syndromes[pos] ^= element
element = self.gf.mul(sqr, element)
def serialized_size(self):
return (self.capacity * self.field_size + 7) // 8
def serialize(self):
val = 0
for i in range(self.capacity):
val |= self.odd_syndromes[i] << (self.field_size * i)
return val.to_bytes(self.serialized_size(), 'little')
def deserialize(self, byte_data):
assert len(byte_data) == self.serialized_size()
val = int.from_bytes(byte_data, 'little')
for i in range(self.capacity):
self.odd_syndromes[i] = (val >> (self.field_size * i)) & ((1 << self.field_size) - 1)
def clone(self):
ret = Minisketch(self.field_size, self.capacity)
ret.odd_syndromes = list(self.odd_syndromes)
ret.gf = self.gf
return ret
def merge(self, other):
assert self.capacity == other.capacity
assert self.field_size == other.field_size
for i in range(self.capacity):
self.odd_syndromes[i] ^= other.odd_syndromes[i]
def decode(self, max_count=None):
all_syndromes = [0 for _ in range(2 * len(self.odd_syndromes))]
for i in range(len(self.odd_syndromes)):
all_syndromes[i * 2] = self.odd_syndromes[i]
all_syndromes[i * 2 + 1] = self.gf.sqr(all_syndromes[i])
poly = berlekamp_massey(all_syndromes, self.gf)
if len(poly) == 0:
return None
if len(poly) == 1:
return []
if max_count is not None and len(poly) > 1 + max_count:
return None
roots = poly_find_roots(list(reversed(poly)), self.gf)
if len(roots) == 0:
return None
return roots
class TestMinisketch(unittest.TestCase):
@classmethod
def construct_data(cls, field_size, num_a_only, num_b_only, num_both):
sample = []
for _ in range(num_a_only + num_b_only + num_both):
while True:
r = random.randrange(1, 1 << field_size)
if r not in sample:
sample.append(r)
break
full_a = sample[:num_a_only + num_both]
full_b = sample[num_a_only:]
random.shuffle(full_a)
random.shuffle(full_b)
return full_a, full_b
def field_size_capacity_test(self, field_size, capacity):
used_capacity = random.randrange(capacity + 1)
num_a = random.randrange(used_capacity + 1)
num_both = random.randrange(min(2 * capacity, (1 << field_size) - 1 - used_capacity) + 1)
full_a, full_b = self.construct_data(field_size, num_a, used_capacity - num_a, num_both)
sketch_a = Minisketch(field_size, capacity)
sketch_b = Minisketch(field_size, capacity)
for v in full_a:
sketch_a.add(v)
for v in full_b:
sketch_b.add(v)
sketch_combined = sketch_a.clone()
sketch_b_ser = sketch_b.serialize()
sketch_b_received = Minisketch(field_size, capacity)
sketch_b_received.deserialize(sketch_b_ser)
sketch_combined.merge(sketch_b_received)
decode = sketch_combined.decode()
self.assertEqual(decode, sorted(set(full_a) ^ set(full_b)))
def test(self):
for field_size in range(2, 65):
for capacity in [0, 1, 2, 5, 10, field_size]:
self.field_size_capacity_test(field_size, min(capacity, (1 << field_size) - 1))
if __name__ == '__main__':
unittest.main()