use std::cmp::{min, Eq, PartialEq, Reverse};
use std::collections::hash_map::DefaultHasher;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::fmt;
use std::hash::{Hash, Hasher};
use fxhash::FxHashMap;
use roaring::RoaringBitmap;
use crate::dachshund::error::{CLQError, CLQResult};
use crate::dachshund::id_types::{GraphId, NodeLabel, NodeTypeIdInternal};
use crate::dachshund::node::{Node, NodeBase};
use crate::dachshund::row::CliqueRow;
use crate::dachshund::scorer::Scorer;
use crate::dachshund::typed_graph::LabeledGraph;
use std::sync::mpsc::Sender;
#[derive(Clone)]
pub struct LocalDensityGuarantee {
pub num_edges: usize,
pub exceptions: RoaringBitmap,
}
#[derive(Clone)]
pub struct Recipe {
pub checksum: Option<u64>,
pub node_id: Option<u32>,
pub score: Option<f32>,
pub local_guarantee: Option<LocalDensityGuarantee>,
}
impl PartialEq for Recipe {
fn eq(&self, other: &Recipe) -> bool {
(self.checksum == other.checksum) && (self.node_id == other.node_id)
}
}
impl Eq for Recipe {}
impl Hash for Recipe {
fn hash<H: Hasher>(&self, state: &mut H) {
if let Some(node_id) = self.node_id {
merge_checksum(self.checksum, node_id).unwrap().hash(state);
} else {
self.checksum.unwrap().hash(state);
}
}
}
type NeigbhorhoodMap = HashMap<u32, u32>;
pub struct Candidate<'a, TGraph>
where
TGraph: LabeledGraph,
{
pub graph: &'a TGraph,
pub core_ids: RoaringBitmap,
pub non_core_ids: RoaringBitmap,
pub checksum: Option<u64>,
score: Option<f32>,
max_core_node_edges: usize,
ties_between_nodes: usize,
local_guarantee: LocalDensityGuarantee,
neighborhood: NeigbhorhoodMap,
node_counts: Vec<usize>,
}
impl<'a, T: LabeledGraph> Hash for Candidate<'a, T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.checksum.unwrap().hash(state);
}
}
impl<'a, T: LabeledGraph> PartialEq for Candidate<'a, T> {
fn eq(&self, other: &Self) -> bool {
self.checksum == other.checksum && self.score == other.score
}
}
impl<'a, T: LabeledGraph> Eq for Candidate<'a, T> {}
impl<'a, T: LabeledGraph> fmt::Display for Candidate<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.checksum.unwrap())
}
}
impl<'a, TGraph: LabeledGraph> Candidate<'a, TGraph>
where
TGraph: LabeledGraph<NodeType = Node>,
{
pub fn init_blank(graph: &'a TGraph, num_non_core_types: usize) -> Self {
Self {
graph,
core_ids: RoaringBitmap::new(),
non_core_ids: RoaringBitmap::new(),
checksum: None,
score: None,
max_core_node_edges: 0,
ties_between_nodes: 0,
local_guarantee: LocalDensityGuarantee {
num_edges: 0,
exceptions: RoaringBitmap::new(),
},
neighborhood: HashMap::new(),
node_counts: vec![0; num_non_core_types + 1],
}
}
pub fn new(node_id: u32, graph: &'a TGraph, scorer: &Scorer) -> CLQResult<Self> {
let mut candidate: Self = Candidate::init_blank(graph, scorer.get_num_non_core_types());
candidate.add_node(node_id)?;
let score = scorer.score(&mut candidate)?;
candidate.set_score(score)?;
Ok(candidate)
}
pub fn from_clique_rows(
rows: &'a Vec<CliqueRow>,
graph: &'a TGraph,
scorer: &Scorer,
) -> CLQResult<Option<Self>> {
assert!(!rows.is_empty());
let mut candidate: Candidate<TGraph> =
Candidate::init_blank(graph, scorer.get_num_non_core_types());
for row in rows {
if graph.has_node_by_label(row.node_id) {
let node = graph.get_node_by_label(row.node_id);
assert_eq!(node.non_core_type, row.target_type);
candidate.add_node(node.node_id)?;
}
}
if candidate.checksum.is_none() {
return Ok(None);
}
let score = scorer.score(&mut candidate)?;
candidate.set_score(score)?;
Ok(Some(candidate))
}
pub fn add_node(&mut self, node_id: u32) -> CLQResult<()> {
self.checksum = merge_checksum(self.checksum, node_id);
if self.graph.get_node(node_id).is_core() {
self.core_ids.insert(node_id);
self.local_guarantee.exceptions.insert(node_id);
self.node_counts[0_usize] += 1;
} else {
self.non_core_ids.insert(node_id);
self.increment_max_core_node_edges(node_id)?;
self.node_counts[self.graph.get_node(node_id).non_core_type.unwrap().value()] += 1;
}
self.increment_ties_between_nodes(node_id);
self.reset_score();
self.adjust_neighborhood(node_id);
Ok(())
}
pub fn sorted_core_labels(&self, reverse_labels_map: &FxHashMap<u32, NodeLabel>) -> Vec<i64> {
let mut vec: Vec<i64> = self
.core_ids
.iter()
.map(|x| reverse_labels_map[&x].value())
.collect();
vec.sort();
vec
}
pub fn as_recipe(&self) -> Recipe {
Recipe {
checksum: self.checksum,
node_id: None,
score: self.score,
local_guarantee: Some(self.local_guarantee.clone()),
}
}
pub fn sorted_non_core_labels(
&self,
reverse_labels_map: &FxHashMap<u32, NodeLabel>,
) -> Vec<i64> {
let mut vec: Vec<i64> = self
.non_core_ids
.iter()
.map(|x| reverse_labels_map[&x].value())
.collect();
vec.sort();
vec
}
pub fn set_score(&mut self, score: f32) -> CLQResult<()> {
if self.score.is_some() {
return Err(CLQError::from(
"Tried to set score on an already scored candidate.",
));
}
self.score = Some(score);
Ok(())
}
fn reset_score(&mut self) {
self.score = None;
}
pub fn get_node(&self, node_id: u32) -> &Node {
self.graph.get_node(node_id)
}
pub fn get_score(&self) -> CLQResult<f32> {
let score = self
.score
.ok_or("Tried to get score from an unscored candidate.")?;
Ok(score)
}
pub fn get_neighborhood(&self) -> NeigbhorhoodMap {
self.neighborhood.clone()
}
pub fn get_local_guarantee(&self) -> LocalDensityGuarantee {
self.local_guarantee.clone()
}
pub fn get_node_counts(&self) -> Vec<usize> {
self.node_counts.clone()
}
pub fn to_printable_row(
&self,
target_types: &[String],
reverse_labels_map: FxHashMap<u32, NodeLabel>,
) -> CLQResult<String> {
let encode_err_handler = |e: serde_json::Error| Err(CLQError::from(e.to_string()));
let cliqueness = self.get_cliqueness()?;
let core_ids: Vec<i64> = self.sorted_core_labels(&reverse_labels_map);
let non_core_ids: Vec<i64> = self.sorted_non_core_labels(&reverse_labels_map);
let mut s = String::new();
s.push_str(&core_ids.len().to_string());
s.push('\t');
s.push_str(&non_core_ids.len().to_string());
s.push('\t');
s.push_str(&serde_json::to_string(&core_ids).or_else(encode_err_handler)?);
s.push('\t');
s.push_str(&serde_json::to_string(&non_core_ids).or_else(encode_err_handler)?);
s.push('\t');
let non_core_types_str: Vec<String> = self
.non_core_ids
.clone()
.into_iter()
.map(|id| target_types[self.get_node(id).non_core_type.unwrap().value() - 1].clone())
.collect();
s.push_str(&serde_json::to_string(&non_core_types_str).or_else(encode_err_handler)?);
s.push('\t');
s.push_str(&cliqueness.to_string());
s.push('\t');
s.push_str(&serde_json::to_string(&self.get_core_densities()).or_else(encode_err_handler)?);
s.push('\t');
s.push_str(
&serde_json::to_string(&self.get_non_core_densities(target_types.len())?)
.or_else(encode_err_handler)?,
);
Ok(s)
}
pub fn get_output_rows(
&self,
graph_id: GraphId,
reverse_labels_map: FxHashMap<u32, NodeLabel>,
) -> CLQResult<Vec<CliqueRow>> {
let mut out: Vec<CliqueRow> = Vec::new();
for core_id in self.sorted_core_labels(&reverse_labels_map) {
let row: CliqueRow = CliqueRow {
graph_id,
node_id: core_id.into(),
target_type: None,
};
out.push(row);
}
for non_core_id in self.sorted_non_core_labels(&reverse_labels_map) {
let non_core_node = self.graph.get_node_by_label(non_core_id.into());
let row: CliqueRow = CliqueRow {
graph_id,
node_id: non_core_id.into(),
target_type: non_core_node.non_core_type,
};
out.push(row);
}
Ok(out)
}
pub fn print(
&self,
graph_id: GraphId,
target_types: &[String],
core_type: &str,
output: &Sender<(Option<String>, bool)>,
) -> CLQResult<()> {
for output_row in &self.get_output_rows(graph_id, self.graph.get_reverse_labels_map())? {
let node_type: String = match output_row.target_type {
Some(t) => target_types[t.value() - 1].clone(),
None => core_type.to_string(),
};
output
.send((
Some(format!(
"{}\t{}\t{}",
graph_id.value(),
output_row.node_id.value(),
node_type
)),
false,
))
.unwrap();
}
Ok(())
}
pub fn replicate(&self, keep_score: bool) -> Self {
let mut new_neighborhood = self.neighborhood.clone();
if 2 * new_neighborhood.capacity() > new_neighborhood.len() {
new_neighborhood.shrink_to_fit()
}
Self {
graph: self.graph,
core_ids: self.core_ids.clone(),
non_core_ids: self.non_core_ids.clone(),
checksum: self.checksum,
score: match keep_score {
true => self.score,
false => None,
},
max_core_node_edges: self.max_core_node_edges,
ties_between_nodes: self.ties_between_nodes,
local_guarantee: self.local_guarantee.clone(),
neighborhood: new_neighborhood,
node_counts: self.node_counts.clone(),
}
}
pub fn expand_from_recipe(&self, recipe: &Recipe) -> CLQResult<Self> {
let mut candidate = self.replicate(false);
if let Some(node_id) = recipe.node_id {
if self.get_node(node_id).is_core() {
assert!(!candidate.core_ids.contains(node_id));
assert!(!self.core_ids.contains(node_id));
} else {
assert!(!candidate.non_core_ids.contains(node_id));
assert!(!self.non_core_ids.contains(node_id));
}
candidate.add_node(node_id)?;
candidate.score = recipe.score;
if let Some(local_guarantee) = &recipe.local_guarantee {
candidate.local_guarantee = local_guarantee.clone();
}
} else {
candidate.score = self.score;
}
Ok(candidate)
}
fn get_expansion_candidates(
&self,
num_to_search: usize,
visited_candidates: &mut HashSet<u64>,
) -> CLQResult<Vec<Recipe>> {
assert!(!visited_candidates.contains(&self.checksum.unwrap()));
let mut h = BinaryHeap::with_capacity(num_to_search);
for (node_id, num_ties) in self.neighborhood.iter() {
let heap_element = (Reverse(num_ties), node_id);
if h.len() < num_to_search {
h.push(heap_element);
} else if heap_element < *h.peek().unwrap() {
h.pop();
h.push(heap_element);
}
}
let mut expansion_candidates: Vec<Recipe> = Vec::with_capacity(num_to_search);
for (_num_ties, &node_id) in h.into_sorted_vec().iter() {
let recipe = Recipe {
checksum: self.checksum,
node_id: Some(node_id),
score: None,
local_guarantee: None,
};
let new_checksum = merge_checksum(self.checksum, node_id).unwrap();
if !visited_candidates.contains(&new_checksum) {
expansion_candidates.push(recipe);
}
}
assert!(self.checksum.unwrap() != 0);
visited_candidates.insert(self.checksum.unwrap());
Ok(expansion_candidates)
}
pub fn one_step_search(
&self,
num_to_search: usize,
visited_candidates: &mut HashSet<u64>,
scorer: &Scorer,
) -> CLQResult<Vec<Recipe>> {
let mut expansion_recipes: Vec<Recipe> =
self.get_expansion_candidates(num_to_search, visited_candidates)?;
for recipe in &mut expansion_recipes {
let score = scorer.score_recipe(recipe, self)?;
recipe.score = Some(score);
}
Ok(expansion_recipes)
}
pub fn get_size(&self) -> CLQResult<usize> {
Ok(self.core_ids.len() as usize * self.max_core_node_edges)
}
pub fn get_size_with_node(&self, node: &Node) -> CLQResult<usize> {
if node.is_core() {
Ok((self.core_ids.len() as usize + 1) * self.max_core_node_edges)
} else {
let new_edge_count = node
.max_edge_count_with_core_node()?
.ok_or_else(CLQError::err_none)?;
Ok(self.core_ids.len() as usize * (self.max_core_node_edges + new_edge_count))
}
}
fn increment_max_core_node_edges(&mut self, node_id: u32) -> CLQResult<()> {
let new_edge_count = self
.get_node(node_id)
.max_edge_count_with_core_node()?
.ok_or_else(CLQError::err_none)?;
self.max_core_node_edges += new_edge_count;
Ok(())
}
pub fn get_cliqueness(&self) -> CLQResult<f32> {
let size = self.get_size()?;
let ties_between_nodes = self.count_ties_between_nodes()?;
let cliqueness: f32 = if size > 0 {
ties_between_nodes as f32 / size as f32
} else {
1.0
};
Ok(cliqueness)
}
pub fn get_cliqueness_with_node(&self, node: &Node) -> CLQResult<f32> {
let size = self.get_size_with_node(node)?;
let new_ties = if node.is_core() {
node.count_ties_with_ids(&self.non_core_ids)
} else {
node.count_ties_with_ids(&self.core_ids)
};
let ties_between_nodes = self.count_ties_between_nodes()? + new_ties;
let cliqueness: f32 = if size > 0 {
ties_between_nodes as f32 / size as f32
} else {
1.0
};
Ok(cliqueness)
}
pub fn local_thresh_score_with_node_at_least(
&self,
thresh: f32,
node: &Node,
) -> (bool, Option<LocalDensityGuarantee>) {
if thresh == 0.0 {
return (true, None);
}
let new_max_core_node_edges = if node.is_core() {
self.max_core_node_edges
} else {
self.max_core_node_edges + node.max_edge_count_with_core_node().unwrap().unwrap()
};
let implied_edge_thresh = (thresh * new_max_core_node_edges as f32).ceil() as usize;
let check_all = self.local_guarantee.num_edges < implied_edge_thresh;
let nodes_to_check = if !check_all {
&self.local_guarantee.exceptions
} else {
&self.core_ids
};
let mut min_edges = None;
for node_id in nodes_to_check {
let mut edge_count = self
.get_node(node_id)
.count_ties_with_ids(&self.non_core_ids);
if !node.is_core() {
edge_count += node.count_ties_with_id(node_id)
}
if edge_count < implied_edge_thresh {
return (false, None);
}
match min_edges {
Some(num) => min_edges = Some(min(edge_count, num)),
None => min_edges = Some(edge_count),
}
}
if node.is_core() {
let new_edge_count = node.count_ties_with_ids(&self.non_core_ids);
if new_edge_count < implied_edge_thresh {
return (false, None);
}
match min_edges {
Some(num) => min_edges = Some(min(new_edge_count, num)),
None => min_edges = Some(new_edge_count),
}
}
let mut new_num_edges = min_edges.unwrap_or(self.local_guarantee.num_edges);
if !check_all {
new_num_edges = min(self.local_guarantee.num_edges, new_num_edges);
}
(
true,
Some(LocalDensityGuarantee {
num_edges: new_num_edges,
exceptions: RoaringBitmap::new(),
}),
)
}
pub fn local_thresh_score_at_least(&mut self, thresh: f32) -> bool {
if thresh == 0.0 {
return true;
}
let implied_edge_thresh = (thresh * self.max_core_node_edges as f32).ceil() as usize;
let check_all = self.local_guarantee.num_edges < implied_edge_thresh;
let nodes_to_check = if !check_all {
&self.local_guarantee.exceptions
} else {
&self.core_ids
};
let mut min_edges = None;
for node_id in nodes_to_check {
let edge_count = self
.get_node(node_id)
.count_ties_with_ids(&self.non_core_ids);
if edge_count < implied_edge_thresh {
return false;
}
match min_edges {
Some(num) => min_edges = Some(min(edge_count, num)),
None => min_edges = Some(edge_count),
}
}
let mut new_num_edges = min_edges.unwrap_or(self.local_guarantee.num_edges);
if !check_all {
new_num_edges = min(self.local_guarantee.num_edges, new_num_edges);
}
self.local_guarantee = LocalDensityGuarantee {
num_edges: new_num_edges,
exceptions: RoaringBitmap::new(),
};
true
}
pub fn is_clique(&self) -> CLQResult<bool> {
Ok(self.count_ties_between_nodes()? == self.get_size()?)
}
pub fn count_ties_between_nodes(&self) -> CLQResult<usize> {
Ok(self.ties_between_nodes)
}
fn increment_ties_between_nodes(&mut self, node_id: u32) {
let new_ties = if self.graph.get_node(node_id).is_core() {
self.get_node(node_id)
.count_ties_with_ids(&self.non_core_ids)
} else {
self.get_node(node_id).count_ties_with_ids(&self.core_ids)
};
self.ties_between_nodes += new_ties;
}
fn adjust_neighborhood(&mut self, node_id: u32) {
let opposite_shore = if self.graph.get_node(node_id).is_core() {
&self.non_core_ids
} else {
&self.core_ids
};
let neighbors: Vec<u32> = self
.get_node(node_id)
.edges
.iter()
.map(|x| x.target_id)
.collect();
for target_id in neighbors {
if !opposite_shore.contains(target_id) {
let counter = self.neighborhood.entry(target_id).or_insert(0);
*counter += 1;
}
}
self.neighborhood.remove(&node_id);
}
fn get_non_core_densities(&self, num_non_core_types: usize) -> CLQResult<Vec<f32>> {
let mut non_core_max_counts: Vec<usize> = vec![0; num_non_core_types + 1];
let mut non_core_out_counts: Vec<usize> = vec![0; num_non_core_types + 1];
for non_core_id in &self.non_core_ids {
let non_core = self.get_node(non_core_id);
let non_core_type_id: NodeTypeIdInternal = non_core
.non_core_type
.ok_or_else(CLQError::err_none)?
.value();
let num_ties: usize = non_core.count_ties_with_ids(&self.core_ids);
let max_density = non_core
.max_edge_count_with_core_node()?
.ok_or_else(CLQError::err_none)?;
non_core_max_counts[non_core_type_id] += max_density * (self.core_ids.len() as usize);
non_core_out_counts[non_core_type_id] += num_ties;
}
let mut non_core_density: Vec<f32> = Vec::new();
for i in 1..non_core_max_counts.len() {
non_core_density.push(non_core_out_counts[i] as f32 / non_core_max_counts[i] as f32);
}
Ok(non_core_density)
}
fn get_core_densities(&self) -> Vec<f32> {
let mut counts: Vec<f32> = Vec::new();
let max_size: usize = self
.non_core_ids
.iter()
.map(|id| {
self.get_node(id)
.max_edge_count_with_core_node()
.unwrap()
.unwrap()
})
.sum();
for node_id in &self.core_ids {
let node = self.get_node(node_id);
let num_ties: usize = node.count_ties_with_ids(&self.non_core_ids);
counts.push(num_ties as f32 / max_size as f32);
}
counts
}
}
fn merge_checksum(checksum: Option<u64>, node_id: u32) -> Option<u64> {
let mut s = DefaultHasher::new();
node_id.hash(&mut s);
let node_hash: u64 = s.finish();
if let Some(candidate_hash) = checksum {
Some(candidate_hash.wrapping_add(node_hash))
} else {
Some(node_hash)
}
}