lib_dachshund/dachshund/
core_transformer.rs1extern crate clap;
8extern crate serde_json;
9
10use crate::dachshund::algorithms::coreness::Coreness;
11use crate::dachshund::error::CLQResult;
12use crate::dachshund::graph_builder_base::GraphBuilderBase;
13use crate::dachshund::id_types::{GraphId, NodeId};
14use crate::dachshund::line_processor::{LineProcessor, LineProcessorBase};
15use crate::dachshund::row::{Row, SimpleEdgeRow};
16use crate::dachshund::simple_transformer::GraphStatsTransformerBase;
17use crate::dachshund::simple_undirected_graph::SimpleUndirectedGraph;
18use crate::dachshund::simple_undirected_graph_builder::SimpleUndirectedGraphBuilder;
19use crate::dachshund::transformer_base::TransformerBase;
20use std::collections::HashMap;
21use std::sync::mpsc::Sender;
22use std::sync::Arc;
23
24pub struct CoreTransformer {
25 batch: Vec<SimpleEdgeRow>,
26 line_processor: Arc<LineProcessor>,
27}
28
29impl CoreTransformer {
30 pub fn new() -> Self {
31 Self {
32 batch: Vec::new(),
33 line_processor: Arc::new(LineProcessor::new()),
34 }
35 }
36 fn compute_coreness_and_anomalies(
37 graph: &SimpleUndirectedGraph,
38 ) -> (HashMap<NodeId, usize>, HashMap<NodeId, f64>) {
39 let (_, coreness) = graph.get_coreness();
40 let coreness_anomalies = graph.get_coreness_anomaly(&coreness);
41 (coreness, coreness_anomalies)
42 }
43}
44impl Default for CoreTransformer {
45 fn default() -> Self {
46 CoreTransformer::new()
47 }
48}
49impl GraphStatsTransformerBase for CoreTransformer {}
50
51impl TransformerBase for CoreTransformer {
52 fn get_line_processor(&self) -> Arc<dyn LineProcessorBase> {
53 self.line_processor.clone()
54 }
55 fn process_row(&mut self, row: Box<dyn Row>) -> CLQResult<()> {
56 self.batch.push(row.as_simple_edge_row().unwrap());
57 Ok(())
58 }
59 fn reset(&mut self) -> CLQResult<()> {
60 self.batch.clear();
61 Ok(())
62 }
63
64 fn process_batch(
65 &mut self,
66 graph_id: GraphId,
67 output: &Sender<(Option<String>, bool)>,
68 ) -> CLQResult<()> {
69 let tuples: Vec<(i64, i64)> = self.batch.iter().map(|x| x.as_tuple()).collect();
70 let mut builder = SimpleUndirectedGraphBuilder {};
71 let graph = builder.from_vector(tuples)?;
72 let (coreness_map, anomaly_map) = CoreTransformer::compute_coreness_and_anomalies(&graph);
73 let original_id = self
74 .line_processor
75 .get_original_id(graph_id.value() as usize);
76 let mut coreness: Vec<(NodeId, usize)> = coreness_map.into_iter().collect();
77 coreness.sort_by_key(|(_node_id, coreness)| *coreness);
78 for (node_id, node_coreness) in coreness {
79 let degree = graph.get_node_degree(node_id);
80 let anomaly = anomaly_map.get(&node_id).unwrap();
81 let line: String = format!(
82 "{}\t{}\t{}\t{}\t{}",
83 original_id,
84 node_id.value(),
85 node_coreness,
86 degree,
87 anomaly
88 );
89 output.send((Some(line), false)).unwrap();
90 }
91 Ok(())
92 }
93}