Skip to main content

fso_graph_builder/input/
dotgraph.rs

1use std::{
2    convert::TryFrom,
3    fs::File,
4    hash::Hash,
5    io::Read,
6    marker::PhantomData,
7    mem::ManuallyDrop,
8    ops::Range,
9    path::Path,
10    sync::atomic::{AtomicUsize, Ordering::Acquire},
11};
12
13use atomic::Atomic;
14use dashmap::DashMap;
15use linereader::LineReader;
16use rayon::prelude::*;
17use rustc_hash::FxHashMap;
18
19use crate::{
20    graph::csr::{sort_targets, Csr},
21    graph::Target,
22    index::Idx,
23    Error, Graph, NodeValues, SharedMut, UndirectedDegrees, UndirectedNeighbors,
24};
25
26use super::{edgelist::EdgeList, InputCapabilities, InputPath};
27
28/// DotGraph (the name is based on the file ending `.graph`) is a textual
29/// description of a node labeled graph primarily used as input for subgraph
30/// isomorphism libraries. It has been introduced
31/// [here](https://github.com/RapidsAtHKUST/SubgraphMatching#input) and is also
32/// supported by the
33/// [subgraph-matching](https://crates.io/crates/subgraph-matching) crate.
34///
35/// A graph starts with 't N M' where N is the number of nodes and M is the
36/// number of edges. A node and an edge are formatted as 'v nodeId labelId
37/// degree' and 'e nodeId nodeId' respectively. Note that the format requires
38/// that the node id starts at 0 and the range is `0..N`.
39///
40/// # Example
41///
42/// The following graph contains 5 nodes and 6 relationships. The first line
43/// contains that meta information. The following 5 lines contain one node
44/// description per line, e.g., `v 0 0 2` translates to node `0` with label `0`
45/// and a degree of `2`. Following the nodes, the remaining lines describe
46/// edges, e.g., `e 0 1` represents an edge connecting nodes `0` and `1`.
47///
48/// ```ignore
49/// > cat my_graph.graph
50/// t 5 6
51/// v 0 0 2
52/// v 1 1 3
53/// v 2 2 3
54/// v 3 1 2
55/// v 4 2 2
56/// e 0 1
57/// e 0 2
58/// e 1 2
59/// e 1 3
60/// e 2 4
61/// e 3 4
62/// ```
63pub struct DotGraphInput<NI, Label>
64where
65    NI: Idx,
66    Label: Idx,
67{
68    _phantom: PhantomData<(NI, Label)>,
69}
70
71impl<NI, Label> Default for DotGraphInput<NI, Label>
72where
73    NI: Idx,
74    Label: Idx,
75{
76    fn default() -> Self {
77        Self {
78            _phantom: PhantomData,
79        }
80    }
81}
82
83impl<NI: Idx, Label: Idx> InputCapabilities<NI> for DotGraphInput<NI, Label> {
84    type GraphInput = DotGraph<NI, Label>;
85}
86
87pub struct DotGraph<NI, Label>
88where
89    NI: Idx,
90    Label: Idx,
91{
92    pub labels: Vec<Label>,
93    pub edge_list: EdgeList<NI, ()>,
94    pub max_degree: NI,
95    pub max_label: Label,
96    pub label_frequency: FxHashMap<Label, usize>,
97}
98
99impl<NI, Label> DotGraph<NI, Label>
100where
101    NI: Idx,
102    Label: Idx + Hash,
103{
104    pub fn node_count(&self) -> NI {
105        NI::new(self.labels.len())
106    }
107
108    pub fn label_count(&self) -> usize {
109        self.max_label.index() + 1
110    }
111
112    pub fn max_label_frequency(&self) -> usize {
113        self.label_frequency
114            .values()
115            .max()
116            .cloned()
117            .unwrap_or_default()
118    }
119}
120
121impl<NI, Label, P> TryFrom<InputPath<P>> for DotGraph<NI, Label>
122where
123    P: AsRef<Path>,
124    NI: Idx,
125    Label: Idx + Hash,
126{
127    type Error = Error;
128
129    fn try_from(path: InputPath<P>) -> Result<Self, Self::Error> {
130        let file = File::open(path.0.as_ref())?;
131        let reader = LineReader::new(file);
132        let dot_graph = DotGraph::try_from(reader)?;
133        Ok(dot_graph)
134    }
135}
136
137impl<NI, Label, R> TryFrom<LineReader<R>> for DotGraph<NI, Label>
138where
139    NI: Idx,
140    Label: Idx + Hash,
141    R: Read,
142{
143    type Error = Error;
144
145    /// Converts the given .graph input into a [`DotGraph`].
146    fn try_from(mut lines: LineReader<R>) -> Result<Self, Self::Error> {
147        let mut header = lines.next_line().expect("missing header line")?;
148
149        // skip "t" char and white space
150        header = &header[2..];
151        let (node_count, used) = NI::parse(header);
152        header = &header[used + 1..];
153        let (edge_count, _) = NI::parse(header);
154
155        let mut labels = Vec::<Label>::with_capacity(node_count.index());
156        let mut edges = Vec::with_capacity(edge_count.index());
157
158        let mut max_degree = NI::zero();
159        let mut max_label = Label::zero();
160        let mut label_frequency = FxHashMap::<Label, usize>::default();
161
162        let mut batch = lines.next_batch().expect("missing data")?;
163
164        for _ in 0..node_count.index() {
165            if batch.is_empty() {
166                batch = lines.next_batch().expect("missing data")?;
167            }
168
169            // skip "v" char and white space
170            batch = &batch[2..];
171            // skip node id since input is always sorted by node id
172            let (_, used) = NI::parse(batch);
173            batch = &batch[used + 1..];
174            let (label, used) = Label::parse(batch);
175            batch = &batch[used + 1..];
176            let (degree, used) = NI::parse(batch);
177            batch = &batch[used + 1..];
178
179            labels.push(label);
180
181            if degree > max_degree {
182                max_degree = degree;
183            }
184
185            let frequency = label_frequency.entry(label).or_insert_with(|| {
186                if label > max_label {
187                    max_label = label;
188                }
189                0
190            });
191            *frequency += 1;
192        }
193
194        for _ in 0..edge_count.index() {
195            if batch.is_empty() {
196                batch = lines.next_batch().expect("missing data")?;
197            }
198            // skip "e" char and white space
199            batch = &batch[2..];
200            let (source, used) = NI::parse(batch);
201            batch = &batch[used + 1..];
202            let (target, used) = NI::parse(batch);
203            batch = &batch[used + 1..];
204
205            edges.push((source, target, ()));
206        }
207
208        let edge_list = EdgeList::new(edges);
209
210        Ok(Self {
211            labels,
212            edge_list,
213            max_degree,
214            max_label,
215            label_frequency,
216        })
217    }
218}
219
220pub struct LabelStats<NI, Label> {
221    pub max_degree: NI,
222    pub label_count: usize,
223    pub max_label: Label,
224    pub max_label_frequency: usize,
225    pub label_frequency: FxHashMap<Label, usize>,
226}
227
228impl<NI, Label> LabelStats<NI, Label>
229where
230    NI: Idx,
231    Label: Idx + Hash,
232{
233    pub fn from_graph<G>(graph: &G) -> Self
234    where
235        G: Graph<NI>
236            + UndirectedNeighbors<NI>
237            + UndirectedDegrees<NI>
238            + NodeValues<NI, Label>
239            + Send
240            + Sync,
241    {
242        graph.into()
243    }
244}
245
246impl<NI, Label, G> From<&G> for LabelStats<NI, Label>
247where
248    NI: Idx,
249    Label: Idx + Hash,
250    G: Graph<NI>
251        + UndirectedNeighbors<NI>
252        + UndirectedDegrees<NI>
253        + NodeValues<NI, Label>
254        + Send
255        + Sync,
256{
257    fn from(graph: &G) -> Self {
258        let label_frequency: DashMap<Label, usize> = DashMap::new();
259        let max_degree = AtomicUsize::new(usize::MIN);
260        let max_label = AtomicUsize::new(usize::MIN);
261
262        rayon::iter::split(0..graph.node_count().index(), |range| {
263            if range.len() <= 1 {
264                return (range, None);
265            }
266            let pivot = range.start + (range.end - range.start) / 2;
267            (range.start..pivot, Some(pivot..range.end))
268        })
269        .into_par_iter()
270        .for_each(|range: Range<usize>| {
271            let mut local_max_degree = NI::new(usize::MIN);
272            let mut local_max_label = Label::new(usize::MIN);
273
274            range.into_iter().for_each(|node| {
275                let node = NI::new(node);
276                let label = graph.node_value(node);
277
278                let mut frequency = label_frequency.entry(*label).or_insert_with(|| {
279                    if *label > local_max_label {
280                        local_max_label = *label;
281                    }
282                    0_usize
283                });
284                *frequency += 1;
285
286                local_max_degree = NI::max(local_max_degree, graph.degree(node));
287            });
288
289            update_max_value(local_max_label.index(), &max_label);
290            update_max_value(local_max_degree.index(), &max_degree);
291        });
292
293        let max_degree = NI::new(max_degree.load(atomic::Ordering::Acquire));
294        let max_label = Label::new(max_label.load(atomic::Ordering::Acquire));
295        let max_label_frequency = label_frequency
296            .iter()
297            .map(|ref_multi| *ref_multi.value())
298            .max()
299            .unwrap_or_default();
300        let label_count = label_frequency.len();
301        let label_frequency = label_frequency
302            .into_iter()
303            .collect::<FxHashMap<Label, usize>>();
304
305        Self {
306            max_degree,
307            label_count,
308            max_label,
309            max_label_frequency,
310            label_frequency,
311        }
312    }
313}
314
315fn update_max_value(new_value: usize, shared: &AtomicUsize) {
316    let mut curr_max = shared.load(atomic::Ordering::Relaxed);
317
318    // Even if `curr_max` is outdated, we know that the actual
319    // value is higher, since this is the only case where we
320    // update the value. We can safely return in that case.
321    if new_value < curr_max {
322        return;
323    }
324
325    while curr_max < new_value {
326        if let Err(new_max) = shared.compare_exchange(
327            curr_max,
328            new_value,
329            atomic::Ordering::AcqRel,
330            atomic::Ordering::Relaxed,
331        ) {
332            // CAX failed, we need to try again with the new value.
333            curr_max = new_max;
334        }
335    }
336}
337
338pub struct NeighborLabelFrequency<'a, Label> {
339    map: &'a FxHashMap<Label, usize>,
340}
341
342impl<'a, Label> NeighborLabelFrequency<'a, Label>
343where
344    Label: Hash + Eq,
345{
346    fn new(map: &'a FxHashMap<Label, usize>) -> Self {
347        Self { map }
348    }
349
350    pub fn get(&self, label: Label) -> Option<usize> {
351        self.map.get(&label).copied()
352    }
353
354    pub fn len(&self) -> usize {
355        self.map.len()
356    }
357
358    pub fn is_empty(&self) -> bool {
359        self.len() == 0
360    }
361
362    pub fn iter(&self) -> std::collections::hash_map::Iter<'_, Label, usize> {
363        self.map.iter()
364    }
365}
366
367pub struct NeighborLabelFrequencies<Label, NI> {
368    pub frequencies: Vec<FxHashMap<Label, usize>>,
369    _node_type: PhantomData<NI>,
370}
371
372impl<Label, NI> NeighborLabelFrequencies<Label, NI>
373where
374    NI: Idx,
375    Label: Idx + Hash,
376{
377    pub fn from_graph<G>(graph: &G) -> Self
378    where
379        G: Graph<NI>
380            + UndirectedNeighbors<NI>
381            + UndirectedDegrees<NI>
382            + NodeValues<NI, Label>
383            + Send
384            + Sync,
385    {
386        graph.into()
387    }
388
389    pub fn neighbor_frequency(&self, node: NI) -> NeighborLabelFrequency<'_, Label> {
390        NeighborLabelFrequency::new(&self.frequencies[node.index()])
391    }
392}
393
394impl<Label, G, NI> From<&G> for NeighborLabelFrequencies<Label, NI>
395where
396    NI: Idx,
397    Label: Idx + Hash,
398    G: Graph<NI>
399        + UndirectedNeighbors<NI>
400        + UndirectedDegrees<NI>
401        + NodeValues<NI, Label>
402        + Send
403        + Sync,
404{
405    fn from(graph: &G) -> Self {
406        let mut frequencies = Vec::with_capacity(graph.node_count().index());
407
408        (0..graph.node_count().index())
409            .into_par_iter()
410            .map(|node| {
411                let mut frequency = FxHashMap::<Label, usize>::default();
412
413                for &target in graph.neighbors(NI::new(node)) {
414                    let target_label = graph.node_value(target);
415                    let count = frequency.entry(*target_label).or_insert(0);
416                    *count += 1;
417                }
418
419                frequency
420            })
421            .collect_into_vec(&mut frequencies);
422
423        Self {
424            frequencies,
425            _node_type: PhantomData,
426        }
427    }
428}
429
430pub struct NodeLabelIndex<Label, NI>(Csr<Label, NI, ()>)
431where
432    NI: Idx,
433    Label: Idx;
434
435impl<Label, NI> NodeLabelIndex<Label, NI>
436where
437    NI: Idx,
438    Label: Idx,
439{
440    pub fn from_stats<F>(node_count: NI, label_stats: &LabelStats<NI, Label>, label_func: F) -> Self
441    where
442        Label: Hash,
443        F: Fn(NI) -> Label + Send + Sync,
444    {
445        (node_count, label_stats, label_func).into()
446    }
447
448    pub fn nodes(&self, label: Label) -> &[NI] {
449        self.0.targets(label)
450    }
451}
452
453impl<Label, NI, F> From<(NI, &LabelStats<NI, Label>, F)> for NodeLabelIndex<Label, NI>
454where
455    NI: Idx,
456    Label: Idx + Hash,
457    F: Fn(NI) -> Label + Send + Sync,
458{
459    fn from((node_count, label_stats, label_func): (NI, &LabelStats<NI, Label>, F)) -> Self {
460        let LabelStats {
461            label_count,
462            max_label,
463            label_frequency,
464            ..
465        } = label_stats;
466        // Prefix sum: We insert the offset entries one index to the right and
467        // increment the offset of the next label during insert. That way we'll
468        // end up with the correct offsets after inserting into `nodes` in the
469        // next loop.
470        let mut offsets = Vec::with_capacity(label_count.index() + 1);
471        offsets.push(Label::zero());
472
473        let mut total = Label::zero();
474        for label in Label::zero().range_inclusive(*max_label) {
475            offsets.push(total);
476            total += Label::new(*label_frequency.get(&label).unwrap_or(&0));
477        }
478
479        let offsets = {
480            let mut offsets = ManuallyDrop::new(offsets);
481            let (ptr, len, cap) = (offsets.as_mut_ptr(), offsets.len(), offsets.capacity());
482
483            // SAFETY: Label and Label::Atomic have the same memory layout
484            unsafe {
485                let ptr = ptr as *mut Atomic<Label>;
486                Vec::from_raw_parts(ptr, len, cap)
487            }
488        };
489
490        let mut nodes = Vec::<Target<NI, ()>>::with_capacity(node_count.index());
491        let nodes_ptr = SharedMut::new(nodes.as_mut_ptr());
492
493        (0..node_count.index()).into_par_iter().for_each(|node| {
494            let label = label_func(NI::new(node));
495            let next_label = label + Label::new(1);
496            let offset = Label::get_and_increment(&offsets[next_label.index()], Acquire);
497            // SAFETY: There is exactly one thread that writes at `offset.index()`.
498            unsafe {
499                nodes_ptr
500                    .add(offset.index())
501                    .write(Target::new(NI::new(node), ()));
502            }
503        });
504
505        // SAFETY: The `nodes` vec has `node_count` length and we performed an
506        // insert operation for each index (node). Each inserts happens at a
507        // unique index which is computed from the `offset` array.
508        unsafe {
509            nodes.set_len(node_count.index());
510        }
511
512        let offsets = {
513            let mut offsets = ManuallyDrop::new(offsets);
514            let (ptr, len, cap) = (offsets.as_mut_ptr(), offsets.len(), offsets.capacity());
515
516            // SAFETY: Label and Label::Atomic have the same memory layout
517            unsafe {
518                let ptr = ptr as *mut _;
519                Vec::from_raw_parts(ptr, len, cap)
520            }
521        };
522
523        sort_targets(&offsets, &mut nodes);
524
525        let offsets = offsets.into_boxed_slice();
526        let nodes = nodes.into_boxed_slice();
527
528        let csr = Csr::new(offsets, nodes);
529
530        Self(csr)
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use std::path::PathBuf;
537
538    use crate::input::edgelist::Edges;
539    use crate::input::InputPath;
540    use crate::{CsrLayout, UndirectedCsrGraph};
541
542    use super::*;
543
544    const TEST_GRAPH: [&str; 3] = [env!("CARGO_MANIFEST_DIR"), "resources", "test.graph"];
545
546    #[test]
547    fn dotgraph_from_file() {
548        let path = TEST_GRAPH.iter().collect::<PathBuf>();
549        let graph = DotGraph::<usize, usize>::try_from(InputPath(path.as_path())).unwrap();
550
551        assert_eq!(graph.labels.len(), 5);
552        assert_eq!(graph.edge_list.len(), 6);
553        assert_eq!(graph.max_label, 2);
554        assert_eq!(graph.max_degree, 3);
555    }
556
557    #[test]
558    fn label_test() {
559        let path = TEST_GRAPH.iter().collect::<PathBuf>();
560        let graph = DotGraph::<usize, usize>::try_from(InputPath(path.as_path())).unwrap();
561
562        assert_eq!(graph.max_label_frequency(), 2);
563    }
564
565    #[test]
566    fn label_stats_test() {
567        let path = TEST_GRAPH.iter().collect::<PathBuf>();
568        let graph = DotGraph::<usize, usize>::try_from(InputPath(path.as_path())).unwrap();
569        let graph = UndirectedCsrGraph::<usize, usize>::from((graph, CsrLayout::Sorted));
570
571        let label_stats = LabelStats::from_graph(&graph);
572
573        assert_eq!(label_stats.max_degree, 3);
574        assert_eq!(label_stats.max_label, 2);
575        assert_eq!(label_stats.max_label_frequency, 2);
576        assert_eq!(label_stats.label_frequency[&0], 1);
577        assert_eq!(label_stats.label_frequency[&1], 2);
578        assert_eq!(label_stats.label_frequency[&2], 2);
579    }
580
581    #[test]
582    fn neighbor_label_frequency_test() {
583        let path = TEST_GRAPH.iter().collect::<PathBuf>();
584        let graph = DotGraph::<usize, usize>::try_from(InputPath(path.as_path())).unwrap();
585        let graph = UndirectedCsrGraph::<usize, usize>::from((graph, CsrLayout::Sorted));
586
587        let nlf = NeighborLabelFrequencies::from_graph(&graph);
588
589        assert_eq!(nlf.neighbor_frequency(0).get(0), None);
590        assert_eq!(nlf.neighbor_frequency(0).get(1), Some(1));
591        assert_eq!(nlf.neighbor_frequency(0).get(2), Some(1));
592
593        assert_eq!(nlf.neighbor_frequency(1).get(0), Some(1));
594        assert_eq!(nlf.neighbor_frequency(1).get(1), Some(1));
595        assert_eq!(nlf.neighbor_frequency(1).get(2), Some(1));
596
597        assert_eq!(nlf.neighbor_frequency(2).get(0), Some(1));
598        assert_eq!(nlf.neighbor_frequency(2).get(1), Some(1));
599        assert_eq!(nlf.neighbor_frequency(2).get(2), Some(1));
600
601        assert_eq!(nlf.neighbor_frequency(3).get(0), None);
602        assert_eq!(nlf.neighbor_frequency(3).get(1), Some(1));
603        assert_eq!(nlf.neighbor_frequency(3).get(2), Some(1));
604
605        assert_eq!(nlf.neighbor_frequency(4).get(0), None);
606        assert_eq!(nlf.neighbor_frequency(4).get(1), Some(1));
607        assert_eq!(nlf.neighbor_frequency(4).get(2), Some(1));
608    }
609
610    #[test]
611    fn node_label_index_test() {
612        let path = TEST_GRAPH.iter().collect::<PathBuf>();
613        let graph = DotGraph::<usize, usize>::try_from(InputPath(path.as_path())).unwrap();
614        let graph = UndirectedCsrGraph::<usize, usize>::from((graph, CsrLayout::Sorted));
615        let label_stats = LabelStats::from_graph(&graph);
616
617        let idx = NodeLabelIndex::from_stats(graph.node_count(), &label_stats, |node| {
618            *graph.node_value(node)
619        });
620
621        assert_eq!(idx.nodes(0), &[0]);
622        assert_eq!(idx.nodes(1), &[1, 3]);
623        assert_eq!(idx.nodes(2), &[2, 4]);
624    }
625}