graphannis_core/graph/storage/
disk_adjacency.rs

1use super::*;
2
3use crate::{
4    annostorage::{ondisk::AnnoStorageImpl, NodeAnnotationStorage},
5    dfs::CycleSafeDFS,
6    errors::Result,
7    util::disk_collections::{DiskMap, EvictionStrategy, DEFAULT_BLOCK_CACHE_CAPACITY},
8};
9use itertools::Itertools;
10use rustc_hash::FxHashSet;
11use std::collections::{BTreeSet, HashMap};
12use std::ops::Bound;
13use transient_btree_index::BtreeConfig;
14
15pub const SERIALIZATION_ID: &str = "DiskAdjacencyListV1";
16
17pub struct DiskAdjacencyListStorage {
18    edges: DiskMap<Edge, bool>,
19    inverse_edges: DiskMap<Edge, bool>,
20    annos: AnnoStorageImpl<Edge>,
21    stats: Option<GraphStatistic>,
22}
23
24fn get_fan_outs(edges: &DiskMap<Edge, bool>) -> Result<Vec<usize>> {
25    let mut fan_outs: HashMap<NodeID, usize> = HashMap::default();
26    if !edges.is_empty()? {
27        let all_edges = edges.iter()?;
28        for e in all_edges {
29            let (e, _) = e?;
30            fan_outs
31                .entry(e.source)
32                .and_modify(|num_out| *num_out += 1)
33                .or_insert(1);
34        }
35    }
36    // order the fan-outs
37    let mut fan_outs: Vec<usize> = fan_outs.into_values().collect();
38    fan_outs.sort_unstable();
39
40    Ok(fan_outs)
41}
42
43impl DiskAdjacencyListStorage {
44    pub fn new() -> Result<DiskAdjacencyListStorage> {
45        Ok(DiskAdjacencyListStorage {
46            edges: DiskMap::default(),
47            inverse_edges: DiskMap::default(),
48            annos: AnnoStorageImpl::new(None)?,
49            stats: None,
50        })
51    }
52
53    pub fn clear(&mut self) -> Result<()> {
54        self.edges.clear();
55        self.inverse_edges.clear();
56        self.annos.clear()?;
57        self.stats = None;
58        Ok(())
59    }
60}
61
62impl EdgeContainer for DiskAdjacencyListStorage {
63    fn get_outgoing_edges<'a>(
64        &'a self,
65        node: NodeID,
66    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
67        let lower_bound = Edge {
68            source: node,
69            target: NodeID::MIN,
70        };
71        let upper_bound = Edge {
72            source: node,
73            target: NodeID::MAX,
74        };
75        Box::new(
76            self.edges
77                .range(lower_bound..upper_bound)
78                .map_ok(|(e, _)| e.target),
79        )
80    }
81
82    fn has_outgoing_edges(&self, node: NodeID) -> Result<bool> {
83        let lower_bound = Edge {
84            source: node,
85            target: NodeID::MIN,
86        };
87        let upper_bound = Edge {
88            source: node,
89            target: NodeID::MAX,
90        };
91        if let Some(edge) = self.edges.range(lower_bound..upper_bound).next() {
92            edge?;
93            Ok(true)
94        } else {
95            Ok(false)
96        }
97    }
98
99    fn get_ingoing_edges<'a>(
100        &'a self,
101        node: NodeID,
102    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
103        let lower_bound = Edge {
104            source: node,
105            target: NodeID::MIN,
106        };
107        let upper_bound = Edge {
108            source: node,
109            target: NodeID::MAX,
110        };
111        Box::new(
112            self.inverse_edges
113                .range(lower_bound..upper_bound)
114                .map_ok(|(e, _)| e.target),
115        )
116    }
117    fn source_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
118        match self.edges.iter() {
119            // The unique_by will merge all errors into a single error, which should be ok for our use case
120            Ok(edges) => Box::new(edges.map_ok(|(e, _)| e.source).unique_by(|n| match n {
121                Ok(n) => Some(*n),
122                Err(_) => None,
123            })),
124            Err(e) => Box::new(std::iter::once(Err(e))),
125        }
126    }
127
128    fn get_statistics(&self) -> Option<&GraphStatistic> {
129        self.stats.as_ref()
130    }
131}
132
133impl GraphStorage for DiskAdjacencyListStorage {
134    fn get_anno_storage(&self) -> &dyn EdgeAnnotationStorage {
135        &self.annos
136    }
137
138    fn serialization_id(&self) -> String {
139        SERIALIZATION_ID.to_owned()
140    }
141
142    fn load_from(location: &Path) -> Result<Self>
143    where
144        Self: std::marker::Sized,
145    {
146        // Read stats
147        let stats_path = location.join("edge_stats.bin");
148        let f_stats = std::fs::File::open(stats_path)?;
149        let input = std::io::BufReader::new(f_stats);
150        let stats = bincode::deserialize_from(input)?;
151
152        let result = DiskAdjacencyListStorage {
153            edges: DiskMap::new(
154                Some(&location.join("edges.bin")),
155                EvictionStrategy::default(),
156                DEFAULT_BLOCK_CACHE_CAPACITY,
157                BtreeConfig::default()
158                    .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
159                    .fixed_value_size(2),
160            )?,
161            inverse_edges: DiskMap::new(
162                Some(&location.join("inverse_edges.bin")),
163                EvictionStrategy::default(),
164                DEFAULT_BLOCK_CACHE_CAPACITY,
165                BtreeConfig::default()
166                    .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
167                    .fixed_value_size(2),
168            )?,
169            annos: AnnoStorageImpl::new(Some(
170                location.join(crate::annostorage::ondisk::SUBFOLDER_NAME),
171            ))?,
172            stats,
173        };
174        Ok(result)
175    }
176
177    fn save_to(&self, location: &Path) -> Result<()> {
178        self.edges.write_to(&location.join("edges.bin"))?;
179        self.inverse_edges
180            .write_to(&location.join("inverse_edges.bin"))?;
181        self.annos.save_annotations_to(location)?;
182        // Write stats with bincode
183        let stats_path = location.join("edge_stats.bin");
184        let f_stats = std::fs::File::create(stats_path)?;
185        let mut writer = std::io::BufWriter::new(f_stats);
186        bincode::serialize_into(&mut writer, &self.stats)?;
187
188        Ok(())
189    }
190
191    fn find_connected<'a>(
192        &'a self,
193        node: NodeID,
194        min_distance: usize,
195        max_distance: Bound<usize>,
196    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
197        let mut visited = FxHashSet::<NodeID>::default();
198        let max_distance = match max_distance {
199            Bound::Unbounded => usize::MAX,
200            Bound::Included(max_distance) => max_distance,
201            Bound::Excluded(max_distance) => max_distance - 1,
202        };
203        let it = CycleSafeDFS::<'a>::new(self, node, min_distance, max_distance)
204            .map_ok(|x| x.node)
205            .filter_ok(move |n| visited.insert(*n));
206        Box::new(it)
207    }
208
209    fn find_connected_inverse<'a>(
210        &'a self,
211        node: NodeID,
212        min_distance: usize,
213        max_distance: Bound<usize>,
214    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
215        let mut visited = FxHashSet::<NodeID>::default();
216        let max_distance = match max_distance {
217            Bound::Unbounded => usize::MAX,
218            Bound::Included(max_distance) => max_distance,
219            Bound::Excluded(max_distance) => max_distance - 1,
220        };
221
222        let it = CycleSafeDFS::<'a>::new_inverse(self, node, min_distance, max_distance)
223            .map_ok(|x| x.node)
224            .filter_ok(move |n| visited.insert(*n));
225        Box::new(it)
226    }
227
228    fn distance(&self, source: NodeID, target: NodeID) -> Result<Option<usize>> {
229        let mut it = CycleSafeDFS::new(self, source, usize::MIN, usize::MAX)
230            .filter_ok(|x| target == x.node)
231            .map_ok(|x| x.distance);
232
233        match it.next() {
234            Some(distance) => {
235                let distance = distance?;
236                Ok(Some(distance))
237            }
238            None => Ok(None),
239        }
240    }
241    fn is_connected(
242        &self,
243        source: NodeID,
244        target: NodeID,
245        min_distance: usize,
246        max_distance: std::ops::Bound<usize>,
247    ) -> Result<bool> {
248        let max_distance = match max_distance {
249            Bound::Unbounded => usize::MAX,
250            Bound::Included(max_distance) => max_distance,
251            Bound::Excluded(max_distance) => max_distance - 1,
252        };
253        let mut it = CycleSafeDFS::new(self, source, min_distance, max_distance)
254            .filter_ok(|x| target == x.node);
255
256        match it.next() {
257            Some(next) => {
258                if let Err(e) = next {
259                    Err(e)
260                } else {
261                    Ok(true)
262                }
263            }
264            None => Ok(false),
265        }
266    }
267
268    fn copy(
269        &mut self,
270        _node_annos: &dyn NodeAnnotationStorage,
271        orig: &dyn GraphStorage,
272    ) -> Result<()> {
273        self.clear()?;
274
275        for source in orig.source_nodes() {
276            let source = source?;
277            for target in orig.get_outgoing_edges(source) {
278                let target = target?;
279                let e = Edge { source, target };
280                self.add_edge(e.clone())?;
281                for a in orig.get_anno_storage().get_annotations_for_item(&e)? {
282                    self.add_edge_annotation(e.clone(), a)?;
283                }
284            }
285        }
286
287        self.stats = orig.get_statistics().cloned();
288        self.annos.calculate_statistics()?;
289        Ok(())
290    }
291
292    fn as_writeable(&mut self) -> Option<&mut dyn WriteableGraphStorage> {
293        Some(self)
294    }
295    fn as_edgecontainer(&self) -> &dyn EdgeContainer {
296        self
297    }
298
299    fn inverse_has_same_cost(&self) -> bool {
300        true
301    }
302}
303
304impl WriteableGraphStorage for DiskAdjacencyListStorage {
305    fn add_edge(&mut self, edge: Edge) -> Result<()> {
306        if edge.source != edge.target {
307            // insert to both regular and inverse maps
308            self.inverse_edges.insert(edge.inverse(), true)?;
309            self.edges.insert(edge, true)?;
310            self.stats = None;
311        }
312        Ok(())
313    }
314
315    fn add_edge_annotation(&mut self, edge: Edge, anno: Annotation) -> Result<()> {
316        if self.edges.get(&edge)?.is_some() {
317            self.annos.insert(edge, anno)?;
318        }
319        Ok(())
320    }
321
322    fn delete_edge(&mut self, edge: &Edge) -> Result<()> {
323        self.edges.remove(edge)?;
324        self.inverse_edges.remove(&edge.inverse())?;
325
326        self.annos.remove_item(edge)?;
327
328        Ok(())
329    }
330    fn delete_edge_annotation(&mut self, edge: &Edge, anno_key: &AnnoKey) -> Result<()> {
331        self.annos.remove_annotation_for_item(edge, anno_key)?;
332        Ok(())
333    }
334    fn delete_node(&mut self, node: NodeID) -> Result<()> {
335        // find all both ingoing and outgoing edges
336        let mut to_delete = std::collections::LinkedList::<Edge>::new();
337
338        for target in self.get_outgoing_edges(node) {
339            let target = target?;
340            to_delete.push_back(Edge {
341                source: node,
342                target,
343            });
344        }
345
346        for source in self.get_ingoing_edges(node) {
347            let source = source?;
348            to_delete.push_back(Edge {
349                source,
350                target: node,
351            });
352        }
353
354        for e in to_delete {
355            self.delete_edge(&e)?;
356        }
357
358        Ok(())
359    }
360
361    fn calculate_statistics(&mut self) -> Result<()> {
362        let mut stats = GraphStatistic {
363            max_depth: 1,
364            max_fan_out: 0,
365            avg_fan_out: 0.0,
366            fan_out_99_percentile: 0,
367            inverse_fan_out_99_percentile: 0,
368            cyclic: false,
369            rooted_tree: true,
370            nodes: 0,
371            dfs_visit_ratio: 0.0,
372        };
373
374        self.annos.calculate_statistics()?;
375
376        let mut has_incoming_edge: BTreeSet<NodeID> = BTreeSet::new();
377
378        // find all root nodes
379        let mut roots: BTreeSet<NodeID> = BTreeSet::new();
380        {
381            let mut all_nodes: BTreeSet<NodeID> = BTreeSet::new();
382            for edge in self.edges.iter()? {
383                let (e, _) = edge?;
384                roots.insert(e.source);
385                all_nodes.insert(e.source);
386                all_nodes.insert(e.target);
387
388                if stats.rooted_tree {
389                    if has_incoming_edge.contains(&e.target) {
390                        stats.rooted_tree = false;
391                    } else {
392                        has_incoming_edge.insert(e.target);
393                    }
394                }
395            }
396            stats.nodes = all_nodes.len();
397        }
398
399        let edges_empty = self.edges.is_empty()?;
400
401        if !edges_empty {
402            for edge in self.edges.iter()? {
403                let (e, _) = edge?;
404                roots.remove(&e.target);
405            }
406        }
407
408        let fan_outs = get_fan_outs(&self.edges)?;
409        let sum_fan_out: usize = fan_outs.iter().sum();
410
411        if let Some(last) = fan_outs.last() {
412            stats.max_fan_out = *last;
413        }
414        let inverse_fan_outs = get_fan_outs(&self.inverse_edges)?;
415
416        // get the percentile value(s)
417        // set some default values in case there are not enough elements in the component
418        if !fan_outs.is_empty() {
419            stats.fan_out_99_percentile = fan_outs[fan_outs.len() - 1];
420        }
421        if !inverse_fan_outs.is_empty() {
422            stats.inverse_fan_out_99_percentile = inverse_fan_outs[inverse_fan_outs.len() - 1];
423        }
424        // calculate the more accurate values
425        if fan_outs.len() >= 100 {
426            let idx: usize = fan_outs.len() / 100;
427            if idx < fan_outs.len() {
428                stats.fan_out_99_percentile = fan_outs[idx];
429            }
430        }
431        if inverse_fan_outs.len() >= 100 {
432            let idx: usize = inverse_fan_outs.len() / 100;
433            if idx < inverse_fan_outs.len() {
434                stats.inverse_fan_out_99_percentile = inverse_fan_outs[idx];
435            }
436        }
437
438        let mut number_of_visits = 0;
439        if roots.is_empty() && !edges_empty {
440            // if we have edges but no roots at all there must be a cycle
441            stats.cyclic = true;
442        } else {
443            for root_node in &roots {
444                let mut dfs = CycleSafeDFS::new(self, *root_node, 0, usize::MAX);
445                for step in &mut dfs {
446                    let step = step?;
447                    number_of_visits += 1;
448                    stats.max_depth = std::cmp::max(stats.max_depth, step.distance);
449                }
450                if dfs.is_cyclic() {
451                    stats.cyclic = true;
452                }
453            }
454        }
455
456        if stats.cyclic {
457            stats.rooted_tree = false;
458            // it's infinite
459            stats.max_depth = 0;
460            stats.dfs_visit_ratio = 0.0;
461        } else if stats.nodes > 0 {
462            stats.dfs_visit_ratio = f64::from(number_of_visits) / (stats.nodes as f64);
463        }
464
465        if sum_fan_out > 0 && stats.nodes > 0 {
466            stats.avg_fan_out = (sum_fan_out as f64) / (stats.nodes as f64);
467        }
468
469        self.stats = Some(stats);
470
471        Ok(())
472    }
473
474    fn clear(&mut self) -> Result<()> {
475        self.annos.clear()?;
476        self.edges.clear();
477        self.inverse_edges.clear();
478        self.stats = None;
479        Ok(())
480    }
481}
482
483#[cfg(test)]
484mod tests;