lib_dachshund/dachshund/
core_transformer.rs

1/*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7extern crate clap;
8extern crate serde_json;
9
10use crate::dachshund::algorithms::coreness::Coreness;
11use crate::dachshund::error::CLQResult;
12use crate::dachshund::graph_builder_base::GraphBuilderBase;
13use crate::dachshund::id_types::{GraphId, NodeId};
14use crate::dachshund::line_processor::{LineProcessor, LineProcessorBase};
15use crate::dachshund::row::{Row, SimpleEdgeRow};
16use crate::dachshund::simple_transformer::GraphStatsTransformerBase;
17use crate::dachshund::simple_undirected_graph::SimpleUndirectedGraph;
18use crate::dachshund::simple_undirected_graph_builder::SimpleUndirectedGraphBuilder;
19use crate::dachshund::transformer_base::TransformerBase;
20use std::collections::HashMap;
21use std::sync::mpsc::Sender;
22use std::sync::Arc;
23
24pub struct CoreTransformer {
25    batch: Vec<SimpleEdgeRow>,
26    line_processor: Arc<LineProcessor>,
27}
28
29impl CoreTransformer {
30    pub fn new() -> Self {
31        Self {
32            batch: Vec::new(),
33            line_processor: Arc::new(LineProcessor::new()),
34        }
35    }
36    fn compute_coreness_and_anomalies(
37        graph: &SimpleUndirectedGraph,
38    ) -> (HashMap<NodeId, usize>, HashMap<NodeId, f64>) {
39        let (_, coreness) = graph.get_coreness();
40        let coreness_anomalies = graph.get_coreness_anomaly(&coreness);
41        (coreness, coreness_anomalies)
42    }
43}
44impl Default for CoreTransformer {
45    fn default() -> Self {
46        CoreTransformer::new()
47    }
48}
49impl GraphStatsTransformerBase for CoreTransformer {}
50
51impl TransformerBase for CoreTransformer {
52    fn get_line_processor(&self) -> Arc<dyn LineProcessorBase> {
53        self.line_processor.clone()
54    }
55    fn process_row(&mut self, row: Box<dyn Row>) -> CLQResult<()> {
56        self.batch.push(row.as_simple_edge_row().unwrap());
57        Ok(())
58    }
59    fn reset(&mut self) -> CLQResult<()> {
60        self.batch.clear();
61        Ok(())
62    }
63
64    fn process_batch(
65        &mut self,
66        graph_id: GraphId,
67        output: &Sender<(Option<String>, bool)>,
68    ) -> CLQResult<()> {
69        let tuples: Vec<(i64, i64)> = self.batch.iter().map(|x| x.as_tuple()).collect();
70        let mut builder = SimpleUndirectedGraphBuilder {};
71        let graph = builder.from_vector(tuples)?;
72        let (coreness_map, anomaly_map) = CoreTransformer::compute_coreness_and_anomalies(&graph);
73        let original_id = self
74            .line_processor
75            .get_original_id(graph_id.value() as usize);
76        let mut coreness: Vec<(NodeId, usize)> = coreness_map.into_iter().collect();
77        coreness.sort_by_key(|(_node_id, coreness)| *coreness);
78        for (node_id, node_coreness) in coreness {
79            let degree = graph.get_node_degree(node_id);
80            let anomaly = anomaly_map.get(&node_id).unwrap();
81            let line: String = format!(
82                "{}\t{}\t{}\t{}\t{}",
83                original_id,
84                node_id.value(),
85                node_coreness,
86                degree,
87                anomaly
88            );
89            output.send((Some(line), false)).unwrap();
90        }
91        Ok(())
92    }
93}