graphannis_core/graph/storage/
disk_adjacency.rs1use super::*;
2
3use crate::{
4 annostorage::{ondisk::AnnoStorageImpl, NodeAnnotationStorage},
5 dfs::CycleSafeDFS,
6 errors::Result,
7 util::disk_collections::{DiskMap, EvictionStrategy, DEFAULT_BLOCK_CACHE_CAPACITY},
8};
9use itertools::Itertools;
10use rustc_hash::FxHashSet;
11use std::collections::{BTreeSet, HashMap};
12use std::ops::Bound;
13use transient_btree_index::BtreeConfig;
14
15pub const SERIALIZATION_ID: &str = "DiskAdjacencyListV1";
16
17pub struct DiskAdjacencyListStorage {
18 edges: DiskMap<Edge, bool>,
19 inverse_edges: DiskMap<Edge, bool>,
20 annos: AnnoStorageImpl<Edge>,
21 stats: Option<GraphStatistic>,
22}
23
24fn get_fan_outs(edges: &DiskMap<Edge, bool>) -> Result<Vec<usize>> {
25 let mut fan_outs: HashMap<NodeID, usize> = HashMap::default();
26 if !edges.is_empty()? {
27 let all_edges = edges.iter()?;
28 for e in all_edges {
29 let (e, _) = e?;
30 fan_outs
31 .entry(e.source)
32 .and_modify(|num_out| *num_out += 1)
33 .or_insert(1);
34 }
35 }
36 let mut fan_outs: Vec<usize> = fan_outs.into_values().collect();
38 fan_outs.sort_unstable();
39
40 Ok(fan_outs)
41}
42
43impl DiskAdjacencyListStorage {
44 pub fn new() -> Result<DiskAdjacencyListStorage> {
45 Ok(DiskAdjacencyListStorage {
46 edges: DiskMap::default(),
47 inverse_edges: DiskMap::default(),
48 annos: AnnoStorageImpl::new(None)?,
49 stats: None,
50 })
51 }
52
53 pub fn clear(&mut self) -> Result<()> {
54 self.edges.clear();
55 self.inverse_edges.clear();
56 self.annos.clear()?;
57 self.stats = None;
58 Ok(())
59 }
60}
61
62impl EdgeContainer for DiskAdjacencyListStorage {
63 fn get_outgoing_edges<'a>(
64 &'a self,
65 node: NodeID,
66 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
67 let lower_bound = Edge {
68 source: node,
69 target: NodeID::MIN,
70 };
71 let upper_bound = Edge {
72 source: node,
73 target: NodeID::MAX,
74 };
75 Box::new(
76 self.edges
77 .range(lower_bound..upper_bound)
78 .map_ok(|(e, _)| e.target),
79 )
80 }
81
82 fn has_outgoing_edges(&self, node: NodeID) -> Result<bool> {
83 let lower_bound = Edge {
84 source: node,
85 target: NodeID::MIN,
86 };
87 let upper_bound = Edge {
88 source: node,
89 target: NodeID::MAX,
90 };
91 if let Some(edge) = self.edges.range(lower_bound..upper_bound).next() {
92 edge?;
93 Ok(true)
94 } else {
95 Ok(false)
96 }
97 }
98
99 fn get_ingoing_edges<'a>(
100 &'a self,
101 node: NodeID,
102 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
103 let lower_bound = Edge {
104 source: node,
105 target: NodeID::MIN,
106 };
107 let upper_bound = Edge {
108 source: node,
109 target: NodeID::MAX,
110 };
111 Box::new(
112 self.inverse_edges
113 .range(lower_bound..upper_bound)
114 .map_ok(|(e, _)| e.target),
115 )
116 }
117 fn source_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
118 match self.edges.iter() {
119 Ok(edges) => Box::new(edges.map_ok(|(e, _)| e.source).unique_by(|n| match n {
121 Ok(n) => Some(*n),
122 Err(_) => None,
123 })),
124 Err(e) => Box::new(std::iter::once(Err(e))),
125 }
126 }
127
128 fn get_statistics(&self) -> Option<&GraphStatistic> {
129 self.stats.as_ref()
130 }
131}
132
133impl GraphStorage for DiskAdjacencyListStorage {
134 fn get_anno_storage(&self) -> &dyn EdgeAnnotationStorage {
135 &self.annos
136 }
137
138 fn serialization_id(&self) -> String {
139 SERIALIZATION_ID.to_owned()
140 }
141
142 fn load_from(location: &Path) -> Result<Self>
143 where
144 Self: std::marker::Sized,
145 {
146 let stats_path = location.join("edge_stats.bin");
148 let f_stats = std::fs::File::open(stats_path)?;
149 let input = std::io::BufReader::new(f_stats);
150 let stats = bincode::deserialize_from(input)?;
151
152 let result = DiskAdjacencyListStorage {
153 edges: DiskMap::new(
154 Some(&location.join("edges.bin")),
155 EvictionStrategy::default(),
156 DEFAULT_BLOCK_CACHE_CAPACITY,
157 BtreeConfig::default()
158 .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
159 .fixed_value_size(2),
160 )?,
161 inverse_edges: DiskMap::new(
162 Some(&location.join("inverse_edges.bin")),
163 EvictionStrategy::default(),
164 DEFAULT_BLOCK_CACHE_CAPACITY,
165 BtreeConfig::default()
166 .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
167 .fixed_value_size(2),
168 )?,
169 annos: AnnoStorageImpl::new(Some(
170 location.join(crate::annostorage::ondisk::SUBFOLDER_NAME),
171 ))?,
172 stats,
173 };
174 Ok(result)
175 }
176
177 fn save_to(&self, location: &Path) -> Result<()> {
178 self.edges.write_to(&location.join("edges.bin"))?;
179 self.inverse_edges
180 .write_to(&location.join("inverse_edges.bin"))?;
181 self.annos.save_annotations_to(location)?;
182 let stats_path = location.join("edge_stats.bin");
184 let f_stats = std::fs::File::create(stats_path)?;
185 let mut writer = std::io::BufWriter::new(f_stats);
186 bincode::serialize_into(&mut writer, &self.stats)?;
187
188 Ok(())
189 }
190
191 fn find_connected<'a>(
192 &'a self,
193 node: NodeID,
194 min_distance: usize,
195 max_distance: Bound<usize>,
196 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
197 let mut visited = FxHashSet::<NodeID>::default();
198 let max_distance = match max_distance {
199 Bound::Unbounded => usize::MAX,
200 Bound::Included(max_distance) => max_distance,
201 Bound::Excluded(max_distance) => max_distance - 1,
202 };
203 let it = CycleSafeDFS::<'a>::new(self, node, min_distance, max_distance)
204 .map_ok(|x| x.node)
205 .filter_ok(move |n| visited.insert(*n));
206 Box::new(it)
207 }
208
209 fn find_connected_inverse<'a>(
210 &'a self,
211 node: NodeID,
212 min_distance: usize,
213 max_distance: Bound<usize>,
214 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
215 let mut visited = FxHashSet::<NodeID>::default();
216 let max_distance = match max_distance {
217 Bound::Unbounded => usize::MAX,
218 Bound::Included(max_distance) => max_distance,
219 Bound::Excluded(max_distance) => max_distance - 1,
220 };
221
222 let it = CycleSafeDFS::<'a>::new_inverse(self, node, min_distance, max_distance)
223 .map_ok(|x| x.node)
224 .filter_ok(move |n| visited.insert(*n));
225 Box::new(it)
226 }
227
228 fn distance(&self, source: NodeID, target: NodeID) -> Result<Option<usize>> {
229 let mut it = CycleSafeDFS::new(self, source, usize::MIN, usize::MAX)
230 .filter_ok(|x| target == x.node)
231 .map_ok(|x| x.distance);
232
233 match it.next() {
234 Some(distance) => {
235 let distance = distance?;
236 Ok(Some(distance))
237 }
238 None => Ok(None),
239 }
240 }
241 fn is_connected(
242 &self,
243 source: NodeID,
244 target: NodeID,
245 min_distance: usize,
246 max_distance: std::ops::Bound<usize>,
247 ) -> Result<bool> {
248 let max_distance = match max_distance {
249 Bound::Unbounded => usize::MAX,
250 Bound::Included(max_distance) => max_distance,
251 Bound::Excluded(max_distance) => max_distance - 1,
252 };
253 let mut it = CycleSafeDFS::new(self, source, min_distance, max_distance)
254 .filter_ok(|x| target == x.node);
255
256 match it.next() {
257 Some(next) => {
258 if let Err(e) = next {
259 Err(e)
260 } else {
261 Ok(true)
262 }
263 }
264 None => Ok(false),
265 }
266 }
267
268 fn copy(
269 &mut self,
270 _node_annos: &dyn NodeAnnotationStorage,
271 orig: &dyn GraphStorage,
272 ) -> Result<()> {
273 self.clear()?;
274
275 for source in orig.source_nodes() {
276 let source = source?;
277 for target in orig.get_outgoing_edges(source) {
278 let target = target?;
279 let e = Edge { source, target };
280 self.add_edge(e.clone())?;
281 for a in orig.get_anno_storage().get_annotations_for_item(&e)? {
282 self.add_edge_annotation(e.clone(), a)?;
283 }
284 }
285 }
286
287 self.stats = orig.get_statistics().cloned();
288 self.annos.calculate_statistics()?;
289 Ok(())
290 }
291
292 fn as_writeable(&mut self) -> Option<&mut dyn WriteableGraphStorage> {
293 Some(self)
294 }
295 fn as_edgecontainer(&self) -> &dyn EdgeContainer {
296 self
297 }
298
299 fn inverse_has_same_cost(&self) -> bool {
300 true
301 }
302}
303
304impl WriteableGraphStorage for DiskAdjacencyListStorage {
305 fn add_edge(&mut self, edge: Edge) -> Result<()> {
306 if edge.source != edge.target {
307 self.inverse_edges.insert(edge.inverse(), true)?;
309 self.edges.insert(edge, true)?;
310 self.stats = None;
311 }
312 Ok(())
313 }
314
315 fn add_edge_annotation(&mut self, edge: Edge, anno: Annotation) -> Result<()> {
316 if self.edges.get(&edge)?.is_some() {
317 self.annos.insert(edge, anno)?;
318 }
319 Ok(())
320 }
321
322 fn delete_edge(&mut self, edge: &Edge) -> Result<()> {
323 self.edges.remove(edge)?;
324 self.inverse_edges.remove(&edge.inverse())?;
325
326 self.annos.remove_item(edge)?;
327
328 Ok(())
329 }
330 fn delete_edge_annotation(&mut self, edge: &Edge, anno_key: &AnnoKey) -> Result<()> {
331 self.annos.remove_annotation_for_item(edge, anno_key)?;
332 Ok(())
333 }
334 fn delete_node(&mut self, node: NodeID) -> Result<()> {
335 let mut to_delete = std::collections::LinkedList::<Edge>::new();
337
338 for target in self.get_outgoing_edges(node) {
339 let target = target?;
340 to_delete.push_back(Edge {
341 source: node,
342 target,
343 });
344 }
345
346 for source in self.get_ingoing_edges(node) {
347 let source = source?;
348 to_delete.push_back(Edge {
349 source,
350 target: node,
351 });
352 }
353
354 for e in to_delete {
355 self.delete_edge(&e)?;
356 }
357
358 Ok(())
359 }
360
361 fn calculate_statistics(&mut self) -> Result<()> {
362 let mut stats = GraphStatistic {
363 max_depth: 1,
364 max_fan_out: 0,
365 avg_fan_out: 0.0,
366 fan_out_99_percentile: 0,
367 inverse_fan_out_99_percentile: 0,
368 cyclic: false,
369 rooted_tree: true,
370 nodes: 0,
371 dfs_visit_ratio: 0.0,
372 };
373
374 self.annos.calculate_statistics()?;
375
376 let mut has_incoming_edge: BTreeSet<NodeID> = BTreeSet::new();
377
378 let mut roots: BTreeSet<NodeID> = BTreeSet::new();
380 {
381 let mut all_nodes: BTreeSet<NodeID> = BTreeSet::new();
382 for edge in self.edges.iter()? {
383 let (e, _) = edge?;
384 roots.insert(e.source);
385 all_nodes.insert(e.source);
386 all_nodes.insert(e.target);
387
388 if stats.rooted_tree {
389 if has_incoming_edge.contains(&e.target) {
390 stats.rooted_tree = false;
391 } else {
392 has_incoming_edge.insert(e.target);
393 }
394 }
395 }
396 stats.nodes = all_nodes.len();
397 }
398
399 let edges_empty = self.edges.is_empty()?;
400
401 if !edges_empty {
402 for edge in self.edges.iter()? {
403 let (e, _) = edge?;
404 roots.remove(&e.target);
405 }
406 }
407
408 let fan_outs = get_fan_outs(&self.edges)?;
409 let sum_fan_out: usize = fan_outs.iter().sum();
410
411 if let Some(last) = fan_outs.last() {
412 stats.max_fan_out = *last;
413 }
414 let inverse_fan_outs = get_fan_outs(&self.inverse_edges)?;
415
416 if !fan_outs.is_empty() {
419 stats.fan_out_99_percentile = fan_outs[fan_outs.len() - 1];
420 }
421 if !inverse_fan_outs.is_empty() {
422 stats.inverse_fan_out_99_percentile = inverse_fan_outs[inverse_fan_outs.len() - 1];
423 }
424 if fan_outs.len() >= 100 {
426 let idx: usize = fan_outs.len() / 100;
427 if idx < fan_outs.len() {
428 stats.fan_out_99_percentile = fan_outs[idx];
429 }
430 }
431 if inverse_fan_outs.len() >= 100 {
432 let idx: usize = inverse_fan_outs.len() / 100;
433 if idx < inverse_fan_outs.len() {
434 stats.inverse_fan_out_99_percentile = inverse_fan_outs[idx];
435 }
436 }
437
438 let mut number_of_visits = 0;
439 if roots.is_empty() && !edges_empty {
440 stats.cyclic = true;
442 } else {
443 for root_node in &roots {
444 let mut dfs = CycleSafeDFS::new(self, *root_node, 0, usize::MAX);
445 for step in &mut dfs {
446 let step = step?;
447 number_of_visits += 1;
448 stats.max_depth = std::cmp::max(stats.max_depth, step.distance);
449 }
450 if dfs.is_cyclic() {
451 stats.cyclic = true;
452 }
453 }
454 }
455
456 if stats.cyclic {
457 stats.rooted_tree = false;
458 stats.max_depth = 0;
460 stats.dfs_visit_ratio = 0.0;
461 } else if stats.nodes > 0 {
462 stats.dfs_visit_ratio = f64::from(number_of_visits) / (stats.nodes as f64);
463 }
464
465 if sum_fan_out > 0 && stats.nodes > 0 {
466 stats.avg_fan_out = (sum_fan_out as f64) / (stats.nodes as f64);
467 }
468
469 self.stats = Some(stats);
470
471 Ok(())
472 }
473
474 fn clear(&mut self) -> Result<()> {
475 self.annos.clear()?;
476 self.edges.clear();
477 self.inverse_edges.clear();
478 self.stats = None;
479 Ok(())
480 }
481}
482
483#[cfg(test)]
484mod tests;