raphtory/algorithms/cores/
k_core.rs1use crate::{
2 core::{entities::VID, state::compute_state::ComputeStateVec},
3 db::{
4 api::view::{NodeViewOps, StaticGraphViewOps},
5 graph::views::node_subgraph::NodeSubgraph,
6 task::{
7 context::Context,
8 node::eval_node::EvalNodeView,
9 task::{ATask, Job, Step},
10 task_runner::TaskRunner,
11 },
12 },
13 prelude::GraphViewOps,
14};
15use std::collections::HashSet;
16
17#[derive(Clone, Debug)]
18struct KCoreState {
19 alive: bool,
20}
21
22impl Default for KCoreState {
23 fn default() -> Self {
24 Self { alive: true }
25 }
26}
27
28pub fn k_core_set<G>(g: &G, k: usize, iter_count: usize, threads: Option<usize>) -> HashSet<VID>
42where
43 G: StaticGraphViewOps,
44{
45 let ctx: Context<G, ComputeStateVec> = g.into();
46
47 let step1 = ATask::new(move |vv| {
48 let deg = vv.degree();
49 let state: &mut KCoreState = vv.get_mut();
50 state.alive = deg >= k;
51 Step::Continue
52 });
53
54 let step2 = ATask::new(move |vv: &mut EvalNodeView<_, KCoreState>| {
55 let prev: bool = vv.prev().alive;
56 if prev {
57 let current = vv
58 .neighbours()
59 .into_iter()
60 .filter(|n| n.prev().alive)
61 .count()
62 >= k;
63 let state: &mut KCoreState = vv.get_mut();
64 if current != prev {
65 state.alive = current;
66 Step::Continue
67 } else {
68 Step::Done
69 }
70 } else {
71 Step::Done
72 }
73 });
74
75 let mut runner: TaskRunner<G, _> = TaskRunner::new(ctx);
76
77 runner.run(
78 vec![Job::new(step1)],
79 vec![Job::read_only(step2)],
80 None,
81 |_, _, _, local| {
82 g.nodes()
83 .iter()
84 .filter(|node| local[node.node.0].alive)
85 .map(|node| node.node)
86 .collect()
87 },
88 threads,
89 iter_count,
90 None,
91 None,
92 )
93}
94
95pub fn k_core<G>(g: &G, k: usize, iter_count: usize, threads: Option<usize>) -> NodeSubgraph<G>
96where
97 G: StaticGraphViewOps,
98{
99 let v_set = k_core_set(g, k, iter_count, threads);
100 g.subgraph(v_set)
101}