use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{
collections::{BTreeMap, HashSet},
time::Instant,
};
use tracing::info;
use crate::runners::OutboundLocalTrust;
const PRE_TRUST_WEIGHT: f32 = 0.5;
const DELTA: f32 = 0.01;
fn find_reachable_peers(
lt: &BTreeMap<u64, OutboundLocalTrust>,
seed: &BTreeMap<u64, f32>,
) -> HashSet<u64> {
let mut to_visit: Vec<&u64> = seed.keys().collect();
let mut visited = HashSet::new();
while let Some(i) = to_visit.pop() {
if visited.contains(i) {
continue;
}
visited.insert(*i);
for (j, v) in lt.get(i).unwrap().outbound_trust_scores() {
if !visited.contains(j) && *v > 0.0 {
to_visit.push(j);
}
}
}
visited
}
fn pre_process(
lt: &mut BTreeMap<u64, OutboundLocalTrust>,
seed: &mut BTreeMap<u64, f32>,
count: u64,
) {
let sum: f32 = seed.par_iter().map(|(_, v)| v).sum();
if sum == 0.0 {
for i in 0..count {
seed.insert(i, 1.0);
}
}
for from in 0..count {
let sum = lt.get(&from).map(|lt| lt.outbound_sum()).unwrap_or(&0.0);
if *sum == 0.0 {
let single_lt = OutboundLocalTrust::from_score_map(seed);
lt.insert(from, single_lt);
}
}
let reachable = find_reachable_peers(lt, seed);
lt.retain(|from, _| reachable.contains(from));
}
fn normalise_lt(lt: &BTreeMap<u64, OutboundLocalTrust>) -> BTreeMap<u64, OutboundLocalTrust> {
lt.par_iter()
.fold(BTreeMap::new, |mut lt_norm, (from, from_map)| {
let from_map_norm = from_map.norm();
lt_norm.insert(*from, from_map_norm);
lt_norm
})
.reduce(BTreeMap::new, |mut acc, lt_norm| {
acc.extend(lt_norm);
acc
})
}
fn normalise_scores(scores: &BTreeMap<u64, f32>) -> BTreeMap<u64, f32> {
let sum: f32 = scores.par_iter().map(|(_, v)| v).sum();
scores
.par_iter()
.fold(BTreeMap::new, |mut scores, (i, value)| {
scores.insert(*i, *value / sum);
scores
})
.reduce(BTreeMap::new, |mut acc, scores| {
acc.extend(scores);
acc
})
}
pub fn positive_run(
mut lt: BTreeMap<u64, OutboundLocalTrust>,
mut seed: BTreeMap<u64, f32>,
count: u64,
) -> Vec<(u64, f32)> {
let start = Instant::now();
info!(
"PRE_PROCESS_START, LT_SIZE: {}, SEED_SIZE: {}",
lt.len(),
seed.len()
);
pre_process(&mut lt, &mut seed, count);
info!(
"PRE_PROCESS_FINISH: {:?}, LT_SIZE: {}, SEED_SIZE: {}",
start.elapsed(),
lt.len(),
seed.len()
);
info!("NORMALISE_LT_SEED");
seed = normalise_scores(&seed);
lt = normalise_lt(<);
let mut scores = seed.clone();
info!("COMPUTE_START");
let start = Instant::now();
let mut i = 0;
loop {
let n_plus_1_scores = iteration(<, &seed, &scores);
let n_plus_1_scores = normalise_scores(&n_plus_1_scores);
let n_plus_2_scores = iteration(<, &seed, &n_plus_1_scores);
let n_plus_2_scores = normalise_scores(&n_plus_2_scores);
let (is_converged, delta) = is_converged(&n_plus_1_scores, &n_plus_2_scores);
info!("ITER: {}, CONVERGED: {}, DELTA: {}", i, is_converged, delta);
if is_converged {
scores = n_plus_1_scores;
break;
} else {
scores = n_plus_2_scores;
}
i += 1;
}
info!(
"COMPUTE_END: {:?}, NUM_SCORES: {}, NUM_ITER: {}",
start.elapsed(),
scores.len(),
i
);
scores.into_iter().collect()
}
pub fn is_converged(scores: &BTreeMap<u64, f32>, next_scores: &BTreeMap<u64, f32>) -> (bool, f32) {
let total_delta = scores
.par_iter()
.fold(
|| 0.0,
|sum, (i, v)| {
let next_score = next_scores.get(i).unwrap_or(&0.0);
(next_score - v).abs() + sum
},
)
.reduce(|| 0.0, |sum_a, sum_b| sum_a + sum_b);
(total_delta <= DELTA, total_delta)
}
pub fn convergence_check(
mut lt: BTreeMap<u64, OutboundLocalTrust>,
mut seed: BTreeMap<u64, f32>,
scores: &BTreeMap<u64, f32>,
count: u64,
) -> bool {
info!(
"PRE_PROCESS_START, LT_SIZE: {}, SEED_SIZE: {}",
lt.len(),
seed.len()
);
pre_process(&mut lt, &mut seed, count);
info!(
"PRE_PROCESS_END. LT_SIZE: {}, SEED_SIZE: {}",
lt.len(),
seed.len()
);
info!("NORMALISE_LT_SEED");
seed = normalise_scores(&seed);
lt = normalise_lt(<);
info!("CONVERGENCE_START");
let start = Instant::now();
let next_scores = iteration(<, &seed, scores);
let next_scores = normalise_scores(&next_scores);
let (is_converged, delta) = is_converged(scores, &next_scores);
info!(
"CONVERGENCE_RESULT: {:?}, DELTA: {}, TIME: {:?}",
is_converged,
delta,
start.elapsed(),
);
is_converged
}
fn iteration(
lt: &BTreeMap<u64, OutboundLocalTrust>,
seed: &BTreeMap<u64, f32>,
scores: &BTreeMap<u64, f32>,
) -> BTreeMap<u64, f32> {
let mut next_scores = lt
.par_iter()
.map(|(from, from_map)| {
let origin_score = scores.get(from).unwrap_or(&0.0);
let mut partial = BTreeMap::new();
for (to, value) in from_map.outbound_trust_scores() {
let score = *value * origin_score;
let to_score = partial.get(to).unwrap_or(&0.0);
partial.insert(*to, to_score + score);
}
partial
})
.reduce(
|| BTreeMap::new(),
|mut acc, partial| {
for (k, v) in partial {
*acc.entry(k).or_insert(0.0) += v;
}
acc
},
);
for (i, v) in &mut next_scores {
let pre_trust = seed.get(i).unwrap_or(&0.0);
*v = PRE_TRUST_WEIGHT * pre_trust + *v * (1.0 - PRE_TRUST_WEIGHT);
}
next_scores
}