import secrets
import random
from typing import List, Tuple, NewType, Dict
TMatrix = List[List[float]]
TVector = List[float]
def eigentrust_central(num_peers: int, local_trust_matrix: TMatrix, initial_trust_scores: TVector):
delta = 0.01
def transpose(t: TMatrix) -> TMatrix:
return [
[ t[j][i] for j in range(len(row))]
for i, row in enumerate(t)
]
def get_matrix_size(a: TMatrix) -> Tuple[int, int]:
m = len(a)
if m == 0:
raise ValueError
n = len(a[0])
if n == 0:
raise ValueError
return (m, n)
def matrix_mul(a: TMatrix, b: TMatrix) -> TMatrix:
assert is_matrix_multipiable(a, b)
m, na = get_matrix_size(a)
_, l = get_matrix_size(b)
return [
[sum([a[i][k] * b[k][j] for k in range(na)]) for j in range(l)]
for i in range(m)
]
def is_matrix_multipiable(a: TMatrix, b: TMatrix):
_, na = get_matrix_size(a)
nb, _ = get_matrix_size(b)
return na == nb
def get_vector_distance(a: TMatrix, b: TMatrix):
if len(b) != len(a):
raise ValueError
s = 0
for index, row_a in enumerate(a):
row_b = b[index]
if len(row_a) != 1 or len(row_b) != 1:
raise ValueError
s += pow(row_a[0] - row_b[0], 2)
return pow(s, 0.5)
ct = transpose(local_trust_matrix)
t_matrix: TMatrix = [[i] for i in initial_trust_scores]
while 1:
new_t = matrix_mul(ct, t_matrix)
if get_vector_distance(t_matrix, new_t) < delta:
break
t_matrix = new_t
return [row[0] for row in new_t]
def eigentrust_distributed(num_peers: int, local_trust_matrix: TMatrix, initial_trust_scores: TVector):
delta = 0.001
TPeerScore = NewType('TPeerScore', float)
TPeerIndex = NewType('TPeerIndex', int)
class Peer:
index: TPeerIndex
neighbors: Dict[TPeerIndex, 'Peer']
local_trust_values: Dict[TPeerIndex, TPeerScore]
ti: TPeerScore
last_cij_ti: Dict[TPeerIndex, TPeerScore]
is_converged: bool
def __init__(self, index: TPeerIndex) -> None:
self.index = index
self.neighbors = {}
self.local_trust_values = {}
self.last_cij_ti = {}
self.ti = 0
self.is_converged = False
def add_neighbor(self, peer: 'Peer', local_trust_value: TPeerScore, pretrust_score: TPeerScore) -> None:
self.neighbors[peer.index] = peer
self.local_trust_values[peer.index] = local_trust_value
self.last_cij_ti[peer.index] = pretrust_score
def heartbeat(self) -> None:
if self.is_converged:
return
new_ti = TPeerScore(0.0)
for j, neighbor_j in self.neighbors.items():
if self.index not in neighbor_j.last_cij_ti:
continue
new_ti += TPeerScore(neighbor_j.last_cij_ti[self.index])
for j in self.neighbors.keys():
self.last_cij_ti[j] = TPeerScore(self.local_trust_values[j] * new_ti)
if abs(new_ti - self.ti) <= delta:
self.is_converged = True
self.ti = new_ti
class Network:
size: TPeerIndex
peers: Tuple['Peer', ...]
is_converged: bool
def __init__(self, size: TPeerIndex) -> None:
self.size = size
self.peers = self._gen_peers(size)
self.is_converged = False
def _gen_peers(self, size: TPeerIndex) -> Tuple['Peer', ...]:
return tuple(
Peer(TPeerIndex(i)) for i in range(size)
)
def _connect_peers(self, local_trust_matrix: TMatrix) -> None:
for i, c_i in enumerate(local_trust_matrix):
for j, c_ij in enumerate(c_i):
if i == j:
continue
self.peers[i].add_neighbor(self.peers[j], c_ij, initial_trust_scores[j])
def tick(self):
peer_list = list(self.peers)
is_not_all_converged = True
for peer in peer_list:
peer.heartbeat()
is_not_all_converged = is_not_all_converged and peer.is_converged
self.is_converged = is_not_all_converged
def get_global_trust_scores(self) -> TVector:
t = sum([peer.ti for peer in self.peers])
return [peer.ti / t for peer in self.peers]
network = Network(num_peers)
network._connect_peers(local_trust_matrix)
while not network.is_converged:
network.tick()
return network.get_global_trust_scores()
def gen_trust_matrix(num_peers: int) -> TMatrix:
mat = []
for i in range(num_peers):
random_ints = [secrets.randbelow(32) for _ in range(num_peers - 1)]
s = sum(random_ints)
normalized = [j / s for j in random_ints]
normalized.insert(i, 0)
mat.append(normalized)
return mat
def print_matrix(c: TMatrix):
print('=' * (len(c) * 20))
for row in c:
print(',\t'.join([str(i) for i in row]))
print('=' * (len(c) * 20))
def main():
num_peers_str = input('Enter number of peers\n')
num_peers = int(num_peers_str)
initial_trust_scores = [1/num_peers for i in range(num_peers)]
print('initial_trust_scores = ', initial_trust_scores)
c = [
[0., 0., 0.9333333333333333, 0.06666666666666667],
[0.4727272727272727, 0., 0.03636363636363636, 0.4909090909090909],
[0.3695652173913043, 0.3695652173913043, 0., 0.2608695652173913],
[0.15625, 0.4375, 0.40625, 0.],
]
print('number of peers = ', num_peers)
print('local reputation matrix is generated randomly:')
print_matrix(c)
res_central = eigentrust_central(num_peers, c, initial_trust_scores)
res_distributed = eigentrust_distributed(num_peers, c, initial_trust_scores)
print(f'res_central\t= {res_central}')
print(f'res_distributed\t= {res_distributed}')
main()