graphannis_core/graph/storage/
disk_path.rs

1use itertools::Itertools;
2use memmap2::{Mmap, MmapMut};
3use normpath::PathExt;
4use std::{
5    collections::HashSet, convert::TryInto, fs::File, io::BufReader, ops::Bound, path::PathBuf,
6};
7use tempfile::tempfile;
8use transient_btree_index::BtreeConfig;
9
10use crate::{
11    annostorage::{ondisk::AnnoStorageImpl, AnnotationStorage},
12    dfs::CycleSafeDFS,
13    errors::Result,
14    try_as_boxed_iter,
15    types::{Edge, NodeID},
16    util::disk_collections::{DiskMap, EvictionStrategy, DEFAULT_BLOCK_CACHE_CAPACITY},
17};
18
19use super::{
20    load_statistics_from_location, save_statistics_to_toml, EdgeContainer, GraphStatistic,
21    GraphStorage,
22};
23use binary_layout::prelude::*;
24
25pub(crate) const MAX_DEPTH: usize = 15;
26pub(crate) const SERIALIZATION_ID: &str = "DiskPathV1_D15";
27const ENTRY_SIZE: usize = (MAX_DEPTH * 8) + 1;
28
29binary_layout!(node_path, LittleEndian, {
30    length: u8,
31    nodes: [u8; MAX_DEPTH*8],
32});
33
34/// A [GraphStorage] that stores a single path for each node on disk.
35pub struct DiskPathStorage {
36    paths: Mmap,
37    paths_file_size: u64,
38    inverse_edges: DiskMap<Edge, bool>,
39    annos: AnnoStorageImpl<Edge>,
40    stats: Option<GraphStatistic>,
41    location: Option<PathBuf>,
42}
43
44fn offset_in_file(n: NodeID) -> u64 {
45    n * (ENTRY_SIZE as u64)
46}
47
48fn offset_in_path(path_idx: usize) -> usize {
49    path_idx * 8
50}
51
52impl DiskPathStorage {
53    pub fn new() -> Result<DiskPathStorage> {
54        let paths = unsafe { Mmap::map(&tempfile()?)? };
55        Ok(DiskPathStorage {
56            paths,
57            paths_file_size: 0,
58            inverse_edges: DiskMap::default(),
59            location: None,
60            annos: AnnoStorageImpl::new(None)?,
61            stats: None,
62        })
63    }
64
65    fn get_outgoing_edge(&self, node: NodeID) -> Result<Option<NodeID>> {
66        if node > self.max_node_id()? {
67            return Ok(None);
68        }
69        let offset = offset_in_file(node) as usize;
70
71        let view = node_path::View::new(&self.paths[offset..(offset + ENTRY_SIZE)]);
72        if view.length().read() == 0 {
73            // No outgoing edges
74            Ok(None)
75        } else {
76            // Read the node ID at the first position
77            let buffer: [u8; 8] = view.nodes()[offset_in_path(0)..offset_in_path(1)].try_into()?;
78            Ok(Some(u64::from_le_bytes(buffer)))
79        }
80    }
81
82    fn max_node_id(&self) -> Result<u64> {
83        let number_of_entries = self.paths_file_size / (ENTRY_SIZE as u64);
84        Ok(number_of_entries - 1)
85    }
86
87    fn path_for_node(&self, node: NodeID) -> Result<Vec<NodeID>> {
88        if node > self.max_node_id()? {
89            return Ok(Vec::default());
90        }
91        let offset = offset_in_file(node) as usize;
92
93        let view = node_path::View::new(&self.paths[offset..(offset + ENTRY_SIZE)]);
94        let length = view.length().read();
95        if length == 0 {
96            // No outgoing edges
97            Ok(Vec::default())
98        } else {
99            // Add all path elements
100            let mut result = Vec::with_capacity(length as usize);
101            for i in 0..length {
102                let i = i as usize;
103                let element_buffer: [u8; 8] =
104                    view.nodes()[offset_in_path(i)..offset_in_path(i + 1)].try_into()?;
105                let ancestor_id = u64::from_le_bytes(element_buffer);
106                result.push(ancestor_id);
107            }
108
109            Ok(result)
110        }
111    }
112}
113
114impl EdgeContainer for DiskPathStorage {
115    fn get_outgoing_edges<'a>(
116        &'a self,
117        node: NodeID,
118    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
119        match self.get_outgoing_edge(node) {
120            Ok(Some(n)) => Box::new(std::iter::once(Ok(n))),
121            Ok(None) => Box::new(std::iter::empty()),
122            Err(e) => Box::new(std::iter::once(Err(e))),
123        }
124    }
125
126    fn get_ingoing_edges<'a>(
127        &'a self,
128        node: NodeID,
129    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
130        let lower_bound = Edge {
131            source: node,
132            target: NodeID::MIN,
133        };
134        let upper_bound = Edge {
135            source: node,
136            target: NodeID::MAX,
137        };
138        Box::new(
139            self.inverse_edges
140                .range(lower_bound..upper_bound)
141                .map_ok(|(e, _)| e.target),
142        )
143    }
144
145    fn has_ingoing_edges(&self, node: NodeID) -> Result<bool> {
146        let lower_bound = Edge {
147            source: node,
148            target: NodeID::MIN,
149        };
150        let upper_bound = Edge {
151            source: node,
152            target: NodeID::MAX,
153        };
154
155        if let Some(edge) = self.inverse_edges.range(lower_bound..upper_bound).next() {
156            edge?;
157            Ok(true)
158        } else {
159            Ok(false)
160        }
161    }
162
163    fn source_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
164        let max_node_id = try_as_boxed_iter!(self.max_node_id());
165        // ignore node entries with empty path in result
166        let it = (0..=max_node_id)
167            .map(move |n| {
168                let offset = offset_in_file(n) as usize;
169                let view = node_path::View::new(&self.paths[offset..(offset + ENTRY_SIZE)]);
170
171                let path_length = view.length().read();
172                if path_length == 0 {
173                    Ok(None)
174                } else {
175                    Ok(Some(n))
176                }
177            })
178            .filter_map_ok(|n| n);
179        Box::new(it)
180    }
181
182    fn get_statistics(&self) -> Option<&GraphStatistic> {
183        self.stats.as_ref()
184    }
185}
186
187impl GraphStorage for DiskPathStorage {
188    fn find_connected<'a>(
189        &'a self,
190        node: NodeID,
191        min_distance: usize,
192        max_distance: std::ops::Bound<usize>,
193    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
194        let mut result = Vec::default();
195        if min_distance == 0 {
196            result.push(Ok(node));
197        }
198
199        let path = try_as_boxed_iter!(self.path_for_node(node));
200        // The 0th index of the path is the node with distance 1, so always subtract 1
201        let start = min_distance.saturating_sub(1);
202
203        let end = match max_distance {
204            std::ops::Bound::Included(max_distance) => max_distance,
205            std::ops::Bound::Excluded(max_distance) => max_distance.saturating_sub(1),
206            std::ops::Bound::Unbounded => path.len(),
207        };
208        let end = end.min(path.len());
209        if start < end {
210            result.extend(path[start..end].iter().map(|n| Ok(*n)));
211        }
212        Box::new(result.into_iter())
213    }
214
215    fn find_connected_inverse<'a>(
216        &'a self,
217        node: NodeID,
218        min_distance: usize,
219        max_distance: std::ops::Bound<usize>,
220    ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
221        let mut visited = HashSet::<NodeID>::default();
222        let max_distance = match max_distance {
223            Bound::Unbounded => usize::MAX,
224            Bound::Included(max_distance) => max_distance,
225            Bound::Excluded(max_distance) => max_distance - 1,
226        };
227
228        let it = CycleSafeDFS::<'a>::new_inverse(self, node, min_distance, max_distance)
229            .filter_map_ok(move |x| {
230                if visited.insert(x.node) {
231                    Some(x.node)
232                } else {
233                    None
234                }
235            });
236        Box::new(it)
237    }
238
239    fn distance(&self, source: NodeID, target: NodeID) -> Result<Option<usize>> {
240        let path = self.path_for_node(source)?;
241        // Find the target node in the path. The path starts at distance "0".
242        let result = path
243            .into_iter()
244            .position(|n| n == target)
245            .map(|idx| idx + 1);
246        Ok(result)
247    }
248
249    fn is_connected(
250        &self,
251        source: NodeID,
252        target: NodeID,
253        min_distance: usize,
254        max_distance: std::ops::Bound<usize>,
255    ) -> Result<bool> {
256        let path = self.path_for_node(source)?;
257        // There is a connection when the target node is located in the path (given the min/max constraints)
258        let start = min_distance.saturating_sub(1).clamp(0, path.len());
259        let end = match max_distance {
260            Bound::Unbounded => path.len(),
261            Bound::Included(max_distance) => max_distance,
262            Bound::Excluded(max_distance) => max_distance.saturating_sub(1),
263        };
264        let end = end.clamp(0, path.len());
265        for p in path.into_iter().take(end).skip(start) {
266            if p == target {
267                return Ok(true);
268            }
269        }
270        Ok(false)
271    }
272
273    fn get_anno_storage(&self) -> &dyn crate::annostorage::EdgeAnnotationStorage {
274        &self.annos
275    }
276
277    fn copy(
278        &mut self,
279        _node_annos: &dyn crate::annostorage::NodeAnnotationStorage,
280        orig: &dyn GraphStorage,
281    ) -> Result<()> {
282        self.inverse_edges.clear();
283
284        // Create a new file which is large enough to contain the paths for all nodes.
285        let max_node_id = orig
286            .source_nodes()
287            .fold_ok(0, |acc, node_id| acc.max(node_id))?;
288        let file_capacity = (max_node_id + 1) * (ENTRY_SIZE as u64);
289
290        let file = tempfile::tempfile()?;
291        if file_capacity > 0 {
292            file.set_len(file_capacity)?;
293        }
294        let mut mmap = unsafe { MmapMut::map_mut(&file)? };
295
296        // Get the paths for all source nodes in the original graph storage
297        for source in orig.source_nodes().sorted_by(|a, b| {
298            let a = a.as_ref().unwrap_or(&0);
299            let b = b.as_ref().unwrap_or(&0);
300            a.cmp(b)
301        }) {
302            let source = source?;
303
304            let offset = offset_in_file(source) as usize;
305            let mut path_view = node_path::View::new(&mut mmap[offset..(offset + ENTRY_SIZE)]);
306            let dfs = CycleSafeDFS::new(orig.as_edgecontainer(), source, 1, MAX_DEPTH);
307            for step in dfs {
308                let step = step?;
309                let target = step.node;
310
311                // Store directly outgoing edges in our inverse list
312                if step.distance == 1 {
313                    let edge = Edge { source, target };
314                    self.inverse_edges.insert(edge.inverse(), true)?;
315
316                    // Copy all annotations for this edge
317                    for a in orig.get_anno_storage().get_annotations_for_item(&edge)? {
318                        self.annos.insert(edge.clone(), a)?;
319                    }
320                }
321
322                // Set the new length
323                path_view.length_mut().write(step.distance.try_into()?);
324                // The distance starts at 1, but we do not repeat the source
325                // node in the path
326                let offset = offset_in_path(step.distance - 1);
327                // Set the node ID at the given position
328                let target_node_id_bytes = target.to_le_bytes();
329                path_view.nodes_mut()[offset..(offset + 8)]
330                    .copy_from_slice(&target_node_id_bytes[..]);
331            }
332        }
333
334        mmap.flush()?;
335        // Re-map file read-only
336
337        self.paths = unsafe { Mmap::map(&file)? };
338        self.paths_file_size = file_capacity;
339        self.stats = orig.get_statistics().cloned();
340        self.annos.calculate_statistics()?;
341        Ok(())
342    }
343
344    fn as_edgecontainer(&self) -> &dyn EdgeContainer {
345        self
346    }
347
348    fn serialization_id(&self) -> String {
349        SERIALIZATION_ID.to_string()
350    }
351
352    fn load_from(location: &std::path::Path) -> Result<Self>
353    where
354        Self: std::marker::Sized,
355    {
356        // Open the new paths file
357        let paths_file = location.join("paths.bin");
358        let paths = File::open(paths_file)?;
359        let paths_file_size = paths.metadata()?.len();
360        let paths = unsafe { Mmap::map(&paths)? };
361
362        // Load the inverse edges map
363        let inverse_edges = DiskMap::new(
364            Some(&location.join("inverse_edges.bin")),
365            EvictionStrategy::default(),
366            DEFAULT_BLOCK_CACHE_CAPACITY,
367            BtreeConfig::default()
368                .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
369                .fixed_value_size(2),
370        )?;
371
372        // Load annotation storage
373        let annos = AnnoStorageImpl::new(Some(
374            location.join(crate::annostorage::ondisk::SUBFOLDER_NAME),
375        ))?;
376
377        let stats = load_statistics_from_location(location)?;
378
379        Ok(Self {
380            paths,
381            paths_file_size,
382            inverse_edges,
383            annos,
384            stats,
385            location: Some(location.to_path_buf()),
386        })
387    }
388
389    fn save_to(&self, location: &std::path::Path) -> Result<()> {
390        // Make sure the output location exists before trying to normalize the paths
391        std::fs::create_dir_all(location)?;
392        // Normalize all paths to check if they are the same
393        let new_location = location.normalize()?;
394        if let Some(old_location) = &self.location {
395            let old_location = old_location.normalize()?;
396            if new_location == old_location {
397                // This is an immutable graph storage so there can't be any
398                // changes to write to the existing location we already use.
399                return Ok(());
400            }
401        }
402        // Copy the current paths file to the new location
403        let new_paths_file = new_location.join("paths.bin");
404        let mut new_paths = File::create(new_paths_file)?;
405        let mut old_reader = BufReader::new(&self.paths[..]);
406        std::io::copy(&mut old_reader, &mut new_paths)?;
407
408        // Copy the inverse edges map to the new location
409        self.inverse_edges
410            .write_to(&location.join("inverse_edges.bin"))?;
411
412        // Save edge annotations
413        self.annos.save_annotations_to(location)?;
414
415        save_statistics_to_toml(location, self.stats.as_ref())?;
416
417        Ok(())
418    }
419}
420
421#[cfg(test)]
422mod tests;