graphannis_core/graph/storage/
disk_adjacency.rs1use super::*;
2
3use crate::{
4 annostorage::{ondisk::AnnoStorageImpl, NodeAnnotationStorage},
5 dfs::CycleSafeDFS,
6 errors::Result,
7 util::disk_collections::{DiskMap, EvictionStrategy, DEFAULT_BLOCK_CACHE_CAPACITY},
8};
9use itertools::Itertools;
10use rustc_hash::FxHashSet;
11use std::collections::{BTreeSet, HashMap};
12use std::ops::Bound;
13use transient_btree_index::BtreeConfig;
14
15pub const SERIALIZATION_ID: &str = "DiskAdjacencyListV1";
16
17pub struct DiskAdjacencyListStorage {
18 edges: DiskMap<Edge, bool>,
19 inverse_edges: DiskMap<Edge, bool>,
20 annos: AnnoStorageImpl<Edge>,
21 stats: Option<GraphStatistic>,
22}
23
24fn get_fan_outs(edges: &DiskMap<Edge, bool>) -> Result<Vec<usize>> {
25 let mut fan_outs: HashMap<NodeID, usize> = HashMap::default();
26 if !edges.is_empty()? {
27 let all_edges = edges.iter()?;
28 for e in all_edges {
29 let (e, _) = e?;
30 fan_outs
31 .entry(e.source)
32 .and_modify(|num_out| *num_out += 1)
33 .or_insert(1);
34 }
35 }
36 let mut fan_outs: Vec<usize> = fan_outs.into_values().collect();
38 fan_outs.sort_unstable();
39
40 Ok(fan_outs)
41}
42
43impl DiskAdjacencyListStorage {
44 pub fn new() -> Result<DiskAdjacencyListStorage> {
45 Ok(DiskAdjacencyListStorage {
46 edges: DiskMap::default(),
47 inverse_edges: DiskMap::default(),
48 annos: AnnoStorageImpl::new(None)?,
49 stats: None,
50 })
51 }
52
53 pub fn clear(&mut self) -> Result<()> {
54 self.edges.clear();
55 self.inverse_edges.clear();
56 self.annos.clear()?;
57 self.stats = None;
58 Ok(())
59 }
60}
61
62impl EdgeContainer for DiskAdjacencyListStorage {
63 fn get_outgoing_edges<'a>(
64 &'a self,
65 node: NodeID,
66 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
67 let lower_bound = Edge {
68 source: node,
69 target: NodeID::MIN,
70 };
71 let upper_bound = Edge {
72 source: node,
73 target: NodeID::MAX,
74 };
75 Box::new(
76 self.edges
77 .range(lower_bound..upper_bound)
78 .map_ok(|(e, _)| e.target),
79 )
80 }
81
82 fn has_outgoing_edges(&self, node: NodeID) -> Result<bool> {
83 let lower_bound = Edge {
84 source: node,
85 target: NodeID::MIN,
86 };
87 let upper_bound = Edge {
88 source: node,
89 target: NodeID::MAX,
90 };
91 if let Some(edge) = self.edges.range(lower_bound..upper_bound).next() {
92 edge?;
93 Ok(true)
94 } else {
95 Ok(false)
96 }
97 }
98
99 fn get_ingoing_edges<'a>(
100 &'a self,
101 node: NodeID,
102 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
103 let lower_bound = Edge {
104 source: node,
105 target: NodeID::MIN,
106 };
107 let upper_bound = Edge {
108 source: node,
109 target: NodeID::MAX,
110 };
111 Box::new(
112 self.inverse_edges
113 .range(lower_bound..upper_bound)
114 .map_ok(|(e, _)| e.target),
115 )
116 }
117 fn source_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
118 match self.edges.iter() {
119 Ok(edges) => Box::new(edges.map_ok(|(e, _)| e.source).unique_by(|n| match n {
121 Ok(n) => Some(*n),
122 Err(_) => None,
123 })),
124 Err(e) => Box::new(std::iter::once(Err(e))),
125 }
126 }
127
128 fn get_statistics(&self) -> Option<&GraphStatistic> {
129 self.stats.as_ref()
130 }
131}
132
133impl GraphStorage for DiskAdjacencyListStorage {
134 fn get_anno_storage(&self) -> &dyn EdgeAnnotationStorage {
135 &self.annos
136 }
137
138 fn serialization_id(&self) -> String {
139 SERIALIZATION_ID.to_owned()
140 }
141
142 fn load_from(location: &Path) -> Result<Self>
143 where
144 Self: std::marker::Sized,
145 {
146 let stats = load_statistics_from_location(location)?;
147 let result = DiskAdjacencyListStorage {
148 edges: DiskMap::new(
149 Some(&location.join("edges.bin")),
150 EvictionStrategy::default(),
151 DEFAULT_BLOCK_CACHE_CAPACITY,
152 BtreeConfig::default()
153 .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
154 .fixed_value_size(2),
155 )?,
156 inverse_edges: DiskMap::new(
157 Some(&location.join("inverse_edges.bin")),
158 EvictionStrategy::default(),
159 DEFAULT_BLOCK_CACHE_CAPACITY,
160 BtreeConfig::default()
161 .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
162 .fixed_value_size(2),
163 )?,
164 annos: AnnoStorageImpl::new(Some(
165 location.join(crate::annostorage::ondisk::SUBFOLDER_NAME),
166 ))?,
167 stats,
168 };
169 Ok(result)
170 }
171
172 fn save_to(&self, location: &Path) -> Result<()> {
173 self.edges.write_to(&location.join("edges.bin"))?;
174 self.inverse_edges
175 .write_to(&location.join("inverse_edges.bin"))?;
176 self.annos.save_annotations_to(location)?;
177 save_statistics_to_toml(location, self.stats.as_ref())?;
178 Ok(())
179 }
180
181 fn find_connected<'a>(
182 &'a self,
183 node: NodeID,
184 min_distance: usize,
185 max_distance: Bound<usize>,
186 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
187 let mut visited = FxHashSet::<NodeID>::default();
188 let max_distance = match max_distance {
189 Bound::Unbounded => usize::MAX,
190 Bound::Included(max_distance) => max_distance,
191 Bound::Excluded(max_distance) => max_distance - 1,
192 };
193 let it = CycleSafeDFS::<'a>::new(self, node, min_distance, max_distance)
194 .map_ok(|x| x.node)
195 .filter_ok(move |n| visited.insert(*n));
196 Box::new(it)
197 }
198
199 fn find_connected_inverse<'a>(
200 &'a self,
201 node: NodeID,
202 min_distance: usize,
203 max_distance: Bound<usize>,
204 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
205 let mut visited = FxHashSet::<NodeID>::default();
206 let max_distance = match max_distance {
207 Bound::Unbounded => usize::MAX,
208 Bound::Included(max_distance) => max_distance,
209 Bound::Excluded(max_distance) => max_distance - 1,
210 };
211
212 let it = CycleSafeDFS::<'a>::new_inverse(self, node, min_distance, max_distance)
213 .map_ok(|x| x.node)
214 .filter_ok(move |n| visited.insert(*n));
215 Box::new(it)
216 }
217
218 fn distance(&self, source: NodeID, target: NodeID) -> Result<Option<usize>> {
219 let mut it = CycleSafeDFS::new(self, source, usize::MIN, usize::MAX)
220 .filter_ok(|x| target == x.node)
221 .map_ok(|x| x.distance);
222
223 match it.next() {
224 Some(distance) => {
225 let distance = distance?;
226 Ok(Some(distance))
227 }
228 None => Ok(None),
229 }
230 }
231 fn is_connected(
232 &self,
233 source: NodeID,
234 target: NodeID,
235 min_distance: usize,
236 max_distance: std::ops::Bound<usize>,
237 ) -> Result<bool> {
238 let max_distance = match max_distance {
239 Bound::Unbounded => usize::MAX,
240 Bound::Included(max_distance) => max_distance,
241 Bound::Excluded(max_distance) => max_distance - 1,
242 };
243 let mut it = CycleSafeDFS::new(self, source, min_distance, max_distance)
244 .filter_ok(|x| target == x.node);
245
246 match it.next() {
247 Some(next) => {
248 if let Err(e) = next {
249 Err(e)
250 } else {
251 Ok(true)
252 }
253 }
254 None => Ok(false),
255 }
256 }
257
258 fn copy(
259 &mut self,
260 _node_annos: &dyn NodeAnnotationStorage,
261 orig: &dyn GraphStorage,
262 ) -> Result<()> {
263 self.clear()?;
264
265 for source in orig.source_nodes() {
266 let source = source?;
267 for target in orig.get_outgoing_edges(source) {
268 let target = target?;
269 let e = Edge { source, target };
270 self.add_edge(e.clone())?;
271 for a in orig.get_anno_storage().get_annotations_for_item(&e)? {
272 self.add_edge_annotation(e.clone(), a)?;
273 }
274 }
275 }
276
277 self.stats = orig.get_statistics().cloned();
278 self.annos.calculate_statistics()?;
279 Ok(())
280 }
281
282 fn as_writeable(&mut self) -> Option<&mut dyn WriteableGraphStorage> {
283 Some(self)
284 }
285 fn as_edgecontainer(&self) -> &dyn EdgeContainer {
286 self
287 }
288
289 fn inverse_has_same_cost(&self) -> bool {
290 true
291 }
292}
293
294impl WriteableGraphStorage for DiskAdjacencyListStorage {
295 fn add_edge(&mut self, edge: Edge) -> Result<()> {
296 if edge.source != edge.target {
297 self.inverse_edges.insert(edge.inverse(), true)?;
299 self.edges.insert(edge, true)?;
300 self.stats = None;
301 }
302 Ok(())
303 }
304
305 fn add_edge_annotation(&mut self, edge: Edge, anno: Annotation) -> Result<()> {
306 if self.edges.get(&edge)?.is_some() {
307 self.annos.insert(edge, anno)?;
308 }
309 Ok(())
310 }
311
312 fn delete_edge(&mut self, edge: &Edge) -> Result<()> {
313 self.edges.remove(edge)?;
314 self.inverse_edges.remove(&edge.inverse())?;
315
316 self.annos.remove_item(edge)?;
317
318 Ok(())
319 }
320 fn delete_edge_annotation(&mut self, edge: &Edge, anno_key: &AnnoKey) -> Result<()> {
321 self.annos.remove_annotation_for_item(edge, anno_key)?;
322 Ok(())
323 }
324 fn delete_node(&mut self, node: NodeID) -> Result<()> {
325 let mut to_delete = std::collections::LinkedList::<Edge>::new();
327
328 for target in self.get_outgoing_edges(node) {
329 let target = target?;
330 to_delete.push_back(Edge {
331 source: node,
332 target,
333 });
334 }
335
336 for source in self.get_ingoing_edges(node) {
337 let source = source?;
338 to_delete.push_back(Edge {
339 source,
340 target: node,
341 });
342 }
343
344 for e in to_delete {
345 self.delete_edge(&e)?;
346 }
347
348 Ok(())
349 }
350
351 fn calculate_statistics(&mut self) -> Result<()> {
352 let mut stats = GraphStatistic {
353 max_depth: 1,
354 max_fan_out: 0,
355 avg_fan_out: 0.0,
356 fan_out_99_percentile: 0,
357 inverse_fan_out_99_percentile: 0,
358 cyclic: false,
359 rooted_tree: true,
360 nodes: 0,
361 root_nodes: 0,
362 dfs_visit_ratio: 0.0,
363 };
364
365 self.annos.calculate_statistics()?;
366
367 let mut has_incoming_edge: BTreeSet<NodeID> = BTreeSet::new();
368
369 let mut roots: BTreeSet<NodeID> = BTreeSet::new();
371 {
372 let mut all_nodes: BTreeSet<NodeID> = BTreeSet::new();
373 for edge in self.edges.iter()? {
374 let (e, _) = edge?;
375 roots.insert(e.source);
376 all_nodes.insert(e.source);
377 all_nodes.insert(e.target);
378
379 if stats.rooted_tree {
380 if has_incoming_edge.contains(&e.target) {
381 stats.rooted_tree = false;
382 } else {
383 has_incoming_edge.insert(e.target);
384 }
385 }
386 }
387 stats.nodes = all_nodes.len();
388 }
389
390 let edges_empty = self.edges.is_empty()?;
391
392 if !edges_empty {
393 for edge in self.edges.iter()? {
394 let (e, _) = edge?;
395 roots.remove(&e.target);
396 }
397 }
398 stats.root_nodes = roots.len();
399
400 let fan_outs = get_fan_outs(&self.edges)?;
401 let sum_fan_out: usize = fan_outs.iter().sum();
402
403 if let Some(last) = fan_outs.last() {
404 stats.max_fan_out = *last;
405 }
406 let inverse_fan_outs = get_fan_outs(&self.inverse_edges)?;
407
408 if !fan_outs.is_empty() {
411 stats.fan_out_99_percentile = fan_outs[fan_outs.len() - 1];
412 }
413 if !inverse_fan_outs.is_empty() {
414 stats.inverse_fan_out_99_percentile = inverse_fan_outs[inverse_fan_outs.len() - 1];
415 }
416 if fan_outs.len() >= 100 {
418 let idx: usize = fan_outs.len() / 100;
419 if idx < fan_outs.len() {
420 stats.fan_out_99_percentile = fan_outs[idx];
421 }
422 }
423 if inverse_fan_outs.len() >= 100 {
424 let idx: usize = inverse_fan_outs.len() / 100;
425 if idx < inverse_fan_outs.len() {
426 stats.inverse_fan_out_99_percentile = inverse_fan_outs[idx];
427 }
428 }
429
430 let mut number_of_visits = 0;
431 if roots.is_empty() && !edges_empty {
432 stats.cyclic = true;
434 } else {
435 for root_node in &roots {
436 let mut dfs = CycleSafeDFS::new(self, *root_node, 0, usize::MAX);
437 for step in &mut dfs {
438 let step = step?;
439 number_of_visits += 1;
440 stats.max_depth = std::cmp::max(stats.max_depth, step.distance);
441 }
442 if dfs.is_cyclic() {
443 stats.cyclic = true;
444 }
445 }
446 }
447
448 if stats.cyclic {
449 stats.rooted_tree = false;
450 stats.max_depth = 0;
452 stats.dfs_visit_ratio = 0.0;
453 } else if stats.nodes > 0 {
454 stats.dfs_visit_ratio = f64::from(number_of_visits) / (stats.nodes as f64);
455 }
456
457 if sum_fan_out > 0 && stats.nodes > 0 {
458 stats.avg_fan_out = (sum_fan_out as f64) / (stats.nodes as f64);
459 }
460
461 self.stats = Some(stats);
462
463 Ok(())
464 }
465
466 fn clear(&mut self) -> Result<()> {
467 self.annos.clear()?;
468 self.edges.clear();
469 self.inverse_edges.clear();
470 self.stats = None;
471 Ok(())
472 }
473}
474
475#[cfg(test)]
476mod tests;