graphannis_core/graph/storage/
disk_adjacency.rs

1use super::*;
2
3use crate::{
4    annostorage::{ondisk::AnnoStorageImpl, NodeAnnotationStorage},
5    dfs::CycleSafeDFS,
6    errors::Result,
7    util::disk_collections::{DiskMap, EvictionStrategy, DEFAULT_BLOCK_CACHE_CAPACITY},
8};
9use itertools::Itertools;
10use rustc_hash::FxHashSet;
11use std::collections::{BTreeSet, HashMap};
12use std::ops::Bound;
13use transient_btree_index::BtreeConfig;
14
15pub const SERIALIZATION_ID: &str = "DiskAdjacencyListV1";
16
17pub struct DiskAdjacencyListStorage {
18    edges: DiskMap<Edge, bool>,
19    inverse_edges: DiskMap<Edge, bool>,
20    annos: AnnoStorageImpl<Edge>,
21    stats: Option<GraphStatistic>,
22}
23
24fn get_fan_outs(edges: &DiskMap<Edge, bool>) -> Result<Vec<usize>> {
25    let mut fan_outs: HashMap<NodeID, usize> = HashMap::default();
26    if !edges.is_empty()? {
27        let all_edges = edges.iter()?;
28        for e in all_edges {
29            let (e, _) = e?;
30            fan_outs
31                .entry(e.source)
32                .and_modify(|num_out| *num_out += 1)
33                .or_insert(1);
34        }
35    }
36    // order the fan-outs
37    let mut fan_outs: Vec<usize> = fan_outs.into_values().collect();
38    fan_outs.sort_unstable();
39
40    Ok(fan_outs)
41}
42
43impl DiskAdjacencyListStorage {
44    pub fn new() -> Result<DiskAdjacencyListStorage> {
45        Ok(DiskAdjacencyListStorage {
46            edges: DiskMap::default(),
47            inverse_edges: DiskMap::default(),
48            annos: AnnoStorageImpl::new(None)?,
49            stats: None,
50        })
51    }
52
53    pub fn clear(&mut self) -> Result<()> {
54        self.edges.clear();
55        self.inverse_edges.clear();
56        self.annos.clear()?;
57        self.stats = None;
58        Ok(())
59    }
60}
61
62impl EdgeContainer for DiskAdjacencyListStorage {
63    fn get_outgoing_edges<'a>(
64        &'a self,
65        node: NodeID,
66    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
67        let lower_bound = Edge {
68            source: node,
69            target: NodeID::MIN,
70        };
71        let upper_bound = Edge {
72            source: node,
73            target: NodeID::MAX,
74        };
75        Box::new(
76            self.edges
77                .range(lower_bound..upper_bound)
78                .map_ok(|(e, _)| e.target),
79        )
80    }
81
82    fn has_outgoing_edges(&self, node: NodeID) -> Result<bool> {
83        let lower_bound = Edge {
84            source: node,
85            target: NodeID::MIN,
86        };
87        let upper_bound = Edge {
88            source: node,
89            target: NodeID::MAX,
90        };
91        if let Some(edge) = self.edges.range(lower_bound..upper_bound).next() {
92            edge?;
93            Ok(true)
94        } else {
95            Ok(false)
96        }
97    }
98
99    fn get_ingoing_edges<'a>(
100        &'a self,
101        node: NodeID,
102    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
103        let lower_bound = Edge {
104            source: node,
105            target: NodeID::MIN,
106        };
107        let upper_bound = Edge {
108            source: node,
109            target: NodeID::MAX,
110        };
111        Box::new(
112            self.inverse_edges
113                .range(lower_bound..upper_bound)
114                .map_ok(|(e, _)| e.target),
115        )
116    }
117    fn source_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
118        match self.edges.iter() {
119            // The unique_by will merge all errors into a single error, which should be ok for our use case
120            Ok(edges) => Box::new(edges.map_ok(|(e, _)| e.source).unique_by(|n| match n {
121                Ok(n) => Some(*n),
122                Err(_) => None,
123            })),
124            Err(e) => Box::new(std::iter::once(Err(e))),
125        }
126    }
127
128    fn get_statistics(&self) -> Option<&GraphStatistic> {
129        self.stats.as_ref()
130    }
131}
132
133impl GraphStorage for DiskAdjacencyListStorage {
134    fn get_anno_storage(&self) -> &dyn EdgeAnnotationStorage {
135        &self.annos
136    }
137
138    fn serialization_id(&self) -> String {
139        SERIALIZATION_ID.to_owned()
140    }
141
142    fn load_from(location: &Path) -> Result<Self>
143    where
144        Self: std::marker::Sized,
145    {
146        let stats = load_statistics_from_location(location)?;
147        let result = DiskAdjacencyListStorage {
148            edges: DiskMap::new(
149                Some(&location.join("edges.bin")),
150                EvictionStrategy::default(),
151                DEFAULT_BLOCK_CACHE_CAPACITY,
152                BtreeConfig::default()
153                    .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
154                    .fixed_value_size(2),
155            )?,
156            inverse_edges: DiskMap::new(
157                Some(&location.join("inverse_edges.bin")),
158                EvictionStrategy::default(),
159                DEFAULT_BLOCK_CACHE_CAPACITY,
160                BtreeConfig::default()
161                    .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
162                    .fixed_value_size(2),
163            )?,
164            annos: AnnoStorageImpl::new(Some(
165                location.join(crate::annostorage::ondisk::SUBFOLDER_NAME),
166            ))?,
167            stats,
168        };
169        Ok(result)
170    }
171
172    fn save_to(&self, location: &Path) -> Result<()> {
173        self.edges.write_to(&location.join("edges.bin"))?;
174        self.inverse_edges
175            .write_to(&location.join("inverse_edges.bin"))?;
176        self.annos.save_annotations_to(location)?;
177        save_statistics_to_toml(location, self.stats.as_ref())?;
178        Ok(())
179    }
180
181    fn find_connected<'a>(
182        &'a self,
183        node: NodeID,
184        min_distance: usize,
185        max_distance: Bound<usize>,
186    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
187        let mut visited = FxHashSet::<NodeID>::default();
188        let max_distance = match max_distance {
189            Bound::Unbounded => usize::MAX,
190            Bound::Included(max_distance) => max_distance,
191            Bound::Excluded(max_distance) => max_distance - 1,
192        };
193        let it = CycleSafeDFS::<'a>::new(self, node, min_distance, max_distance)
194            .map_ok(|x| x.node)
195            .filter_ok(move |n| visited.insert(*n));
196        Box::new(it)
197    }
198
199    fn find_connected_inverse<'a>(
200        &'a self,
201        node: NodeID,
202        min_distance: usize,
203        max_distance: Bound<usize>,
204    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
205        let mut visited = FxHashSet::<NodeID>::default();
206        let max_distance = match max_distance {
207            Bound::Unbounded => usize::MAX,
208            Bound::Included(max_distance) => max_distance,
209            Bound::Excluded(max_distance) => max_distance - 1,
210        };
211
212        let it = CycleSafeDFS::<'a>::new_inverse(self, node, min_distance, max_distance)
213            .map_ok(|x| x.node)
214            .filter_ok(move |n| visited.insert(*n));
215        Box::new(it)
216    }
217
218    fn distance(&self, source: NodeID, target: NodeID) -> Result<Option<usize>> {
219        let mut it = CycleSafeDFS::new(self, source, usize::MIN, usize::MAX)
220            .filter_ok(|x| target == x.node)
221            .map_ok(|x| x.distance);
222
223        match it.next() {
224            Some(distance) => {
225                let distance = distance?;
226                Ok(Some(distance))
227            }
228            None => Ok(None),
229        }
230    }
231    fn is_connected(
232        &self,
233        source: NodeID,
234        target: NodeID,
235        min_distance: usize,
236        max_distance: std::ops::Bound<usize>,
237    ) -> Result<bool> {
238        let max_distance = match max_distance {
239            Bound::Unbounded => usize::MAX,
240            Bound::Included(max_distance) => max_distance,
241            Bound::Excluded(max_distance) => max_distance - 1,
242        };
243        let mut it = CycleSafeDFS::new(self, source, min_distance, max_distance)
244            .filter_ok(|x| target == x.node);
245
246        match it.next() {
247            Some(next) => {
248                if let Err(e) = next {
249                    Err(e)
250                } else {
251                    Ok(true)
252                }
253            }
254            None => Ok(false),
255        }
256    }
257
258    fn copy(
259        &mut self,
260        _node_annos: &dyn NodeAnnotationStorage,
261        orig: &dyn GraphStorage,
262    ) -> Result<()> {
263        self.clear()?;
264
265        for source in orig.source_nodes() {
266            let source = source?;
267            for target in orig.get_outgoing_edges(source) {
268                let target = target?;
269                let e = Edge { source, target };
270                self.add_edge(e.clone())?;
271                for a in orig.get_anno_storage().get_annotations_for_item(&e)? {
272                    self.add_edge_annotation(e.clone(), a)?;
273                }
274            }
275        }
276
277        self.stats = orig.get_statistics().cloned();
278        self.annos.calculate_statistics()?;
279        Ok(())
280    }
281
282    fn as_writeable(&mut self) -> Option<&mut dyn WriteableGraphStorage> {
283        Some(self)
284    }
285    fn as_edgecontainer(&self) -> &dyn EdgeContainer {
286        self
287    }
288
289    fn inverse_has_same_cost(&self) -> bool {
290        true
291    }
292}
293
294impl WriteableGraphStorage for DiskAdjacencyListStorage {
295    fn add_edge(&mut self, edge: Edge) -> Result<()> {
296        if edge.source != edge.target {
297            // insert to both regular and inverse maps
298            self.inverse_edges.insert(edge.inverse(), true)?;
299            self.edges.insert(edge, true)?;
300            self.stats = None;
301        }
302        Ok(())
303    }
304
305    fn add_edge_annotation(&mut self, edge: Edge, anno: Annotation) -> Result<()> {
306        if self.edges.get(&edge)?.is_some() {
307            self.annos.insert(edge, anno)?;
308        }
309        Ok(())
310    }
311
312    fn delete_edge(&mut self, edge: &Edge) -> Result<()> {
313        self.edges.remove(edge)?;
314        self.inverse_edges.remove(&edge.inverse())?;
315
316        self.annos.remove_item(edge)?;
317
318        Ok(())
319    }
320    fn delete_edge_annotation(&mut self, edge: &Edge, anno_key: &AnnoKey) -> Result<()> {
321        self.annos.remove_annotation_for_item(edge, anno_key)?;
322        Ok(())
323    }
324    fn delete_node(&mut self, node: NodeID) -> Result<()> {
325        // find all both ingoing and outgoing edges
326        let mut to_delete = std::collections::LinkedList::<Edge>::new();
327
328        for target in self.get_outgoing_edges(node) {
329            let target = target?;
330            to_delete.push_back(Edge {
331                source: node,
332                target,
333            });
334        }
335
336        for source in self.get_ingoing_edges(node) {
337            let source = source?;
338            to_delete.push_back(Edge {
339                source,
340                target: node,
341            });
342        }
343
344        for e in to_delete {
345            self.delete_edge(&e)?;
346        }
347
348        Ok(())
349    }
350
351    fn calculate_statistics(&mut self) -> Result<()> {
352        let mut stats = GraphStatistic {
353            max_depth: 1,
354            max_fan_out: 0,
355            avg_fan_out: 0.0,
356            fan_out_99_percentile: 0,
357            inverse_fan_out_99_percentile: 0,
358            cyclic: false,
359            rooted_tree: true,
360            nodes: 0,
361            root_nodes: 0,
362            dfs_visit_ratio: 0.0,
363        };
364
365        self.annos.calculate_statistics()?;
366
367        let mut has_incoming_edge: BTreeSet<NodeID> = BTreeSet::new();
368
369        // find all root nodes
370        let mut roots: BTreeSet<NodeID> = BTreeSet::new();
371        {
372            let mut all_nodes: BTreeSet<NodeID> = BTreeSet::new();
373            for edge in self.edges.iter()? {
374                let (e, _) = edge?;
375                roots.insert(e.source);
376                all_nodes.insert(e.source);
377                all_nodes.insert(e.target);
378
379                if stats.rooted_tree {
380                    if has_incoming_edge.contains(&e.target) {
381                        stats.rooted_tree = false;
382                    } else {
383                        has_incoming_edge.insert(e.target);
384                    }
385                }
386            }
387            stats.nodes = all_nodes.len();
388        }
389
390        let edges_empty = self.edges.is_empty()?;
391
392        if !edges_empty {
393            for edge in self.edges.iter()? {
394                let (e, _) = edge?;
395                roots.remove(&e.target);
396            }
397        }
398        stats.root_nodes = roots.len();
399
400        let fan_outs = get_fan_outs(&self.edges)?;
401        let sum_fan_out: usize = fan_outs.iter().sum();
402
403        if let Some(last) = fan_outs.last() {
404            stats.max_fan_out = *last;
405        }
406        let inverse_fan_outs = get_fan_outs(&self.inverse_edges)?;
407
408        // get the percentile value(s)
409        // set some default values in case there are not enough elements in the component
410        if !fan_outs.is_empty() {
411            stats.fan_out_99_percentile = fan_outs[fan_outs.len() - 1];
412        }
413        if !inverse_fan_outs.is_empty() {
414            stats.inverse_fan_out_99_percentile = inverse_fan_outs[inverse_fan_outs.len() - 1];
415        }
416        // calculate the more accurate values
417        if fan_outs.len() >= 100 {
418            let idx: usize = fan_outs.len() / 100;
419            if idx < fan_outs.len() {
420                stats.fan_out_99_percentile = fan_outs[idx];
421            }
422        }
423        if inverse_fan_outs.len() >= 100 {
424            let idx: usize = inverse_fan_outs.len() / 100;
425            if idx < inverse_fan_outs.len() {
426                stats.inverse_fan_out_99_percentile = inverse_fan_outs[idx];
427            }
428        }
429
430        let mut number_of_visits = 0;
431        if roots.is_empty() && !edges_empty {
432            // if we have edges but no roots at all there must be a cycle
433            stats.cyclic = true;
434        } else {
435            for root_node in &roots {
436                let mut dfs = CycleSafeDFS::new(self, *root_node, 0, usize::MAX);
437                for step in &mut dfs {
438                    let step = step?;
439                    number_of_visits += 1;
440                    stats.max_depth = std::cmp::max(stats.max_depth, step.distance);
441                }
442                if dfs.is_cyclic() {
443                    stats.cyclic = true;
444                }
445            }
446        }
447
448        if stats.cyclic {
449            stats.rooted_tree = false;
450            // it's infinite
451            stats.max_depth = 0;
452            stats.dfs_visit_ratio = 0.0;
453        } else if stats.nodes > 0 {
454            stats.dfs_visit_ratio = f64::from(number_of_visits) / (stats.nodes as f64);
455        }
456
457        if sum_fan_out > 0 && stats.nodes > 0 {
458            stats.avg_fan_out = (sum_fan_out as f64) / (stats.nodes as f64);
459        }
460
461        self.stats = Some(stats);
462
463        Ok(())
464    }
465
466    fn clear(&mut self) -> Result<()> {
467        self.annos.clear()?;
468        self.edges.clear();
469        self.inverse_edges.clear();
470        self.stats = None;
471        Ok(())
472    }
473}
474
475#[cfg(test)]
476mod tests;