Skip to main content

raphtory/algorithms/cores/
k_core.rs

1use crate::{
2    core::{entities::VID, state::compute_state::ComputeStateVec},
3    db::{
4        api::view::{NodeViewOps, StaticGraphViewOps},
5        graph::views::node_subgraph::NodeSubgraph,
6        task::{
7            context::Context,
8            node::eval_node::EvalNodeView,
9            task::{ATask, Job, Step},
10            task_runner::TaskRunner,
11        },
12    },
13    prelude::GraphViewOps,
14};
15use std::collections::HashSet;
16
17#[derive(Clone, Debug)]
18struct KCoreState {
19    alive: bool,
20}
21
22impl Default for KCoreState {
23    fn default() -> Self {
24        Self { alive: true }
25    }
26}
27
28/// Determines which nodes are in the k-core for a given value of k
29///
30/// # Arguments
31///
32/// - `g` - A reference to the graph
33/// - `k` - Value of k such that the returned nodes have degree > k (recursively)
34/// - `iter_count` - The number of iterations to run
35/// - `threads` - number of threads to run on
36///
37/// # Returns
38///
39/// A hash set of nodes in the k core
40///
41pub fn k_core_set<G>(g: &G, k: usize, iter_count: usize, threads: Option<usize>) -> HashSet<VID>
42where
43    G: StaticGraphViewOps,
44{
45    let ctx: Context<G, ComputeStateVec> = g.into();
46
47    let step1 = ATask::new(move |vv| {
48        let deg = vv.degree();
49        let state: &mut KCoreState = vv.get_mut();
50        state.alive = deg >= k;
51        Step::Continue
52    });
53
54    let step2 = ATask::new(move |vv: &mut EvalNodeView<_, KCoreState>| {
55        let prev: bool = vv.prev().alive;
56        if prev {
57            let current = vv
58                .neighbours()
59                .into_iter()
60                .filter(|n| n.prev().alive)
61                .count()
62                >= k;
63            let state: &mut KCoreState = vv.get_mut();
64            if current != prev {
65                state.alive = current;
66                Step::Continue
67            } else {
68                Step::Done
69            }
70        } else {
71            Step::Done
72        }
73    });
74
75    let mut runner: TaskRunner<G, _> = TaskRunner::new(ctx);
76
77    runner.run(
78        vec![Job::new(step1)],
79        vec![Job::read_only(step2)],
80        None,
81        |_, _, _, local| {
82            g.nodes()
83                .iter()
84                .filter(|node| local[node.node.0].alive)
85                .map(|node| node.node)
86                .collect()
87        },
88        threads,
89        iter_count,
90        None,
91        None,
92    )
93}
94
95pub fn k_core<G>(g: &G, k: usize, iter_count: usize, threads: Option<usize>) -> NodeSubgraph<G>
96where
97    G: StaticGraphViewOps,
98{
99    let v_set = k_core_set(g, k, iter_count, threads);
100    g.subgraph(v_set)
101}