graphannis_core/graph/storage/
disk_path.rs1use itertools::Itertools;
2use memmap2::{Mmap, MmapMut};
3use normpath::PathExt;
4use std::{
5 collections::HashSet, convert::TryInto, fs::File, io::BufReader, ops::Bound, path::PathBuf,
6};
7use tempfile::tempfile;
8use transient_btree_index::BtreeConfig;
9
10use crate::{
11 annostorage::{ondisk::AnnoStorageImpl, AnnotationStorage},
12 dfs::CycleSafeDFS,
13 errors::Result,
14 try_as_boxed_iter,
15 types::{Edge, NodeID},
16 util::disk_collections::{DiskMap, EvictionStrategy, DEFAULT_BLOCK_CACHE_CAPACITY},
17};
18
19use super::{
20 load_statistics_from_location, save_statistics_to_toml, EdgeContainer, GraphStatistic,
21 GraphStorage,
22};
23use binary_layout::prelude::*;
24
25pub(crate) const MAX_DEPTH: usize = 15;
26pub(crate) const SERIALIZATION_ID: &str = "DiskPathV1_D15";
27const ENTRY_SIZE: usize = (MAX_DEPTH * 8) + 1;
28
29binary_layout!(node_path, LittleEndian, {
30 length: u8,
31 nodes: [u8; MAX_DEPTH*8],
32});
33
34pub struct DiskPathStorage {
36 paths: Mmap,
37 paths_file_size: u64,
38 inverse_edges: DiskMap<Edge, bool>,
39 annos: AnnoStorageImpl<Edge>,
40 stats: Option<GraphStatistic>,
41 location: Option<PathBuf>,
42}
43
44fn offset_in_file(n: NodeID) -> u64 {
45 n * (ENTRY_SIZE as u64)
46}
47
48fn offset_in_path(path_idx: usize) -> usize {
49 path_idx * 8
50}
51
52impl DiskPathStorage {
53 pub fn new() -> Result<DiskPathStorage> {
54 let paths = unsafe { Mmap::map(&tempfile()?)? };
55 Ok(DiskPathStorage {
56 paths,
57 paths_file_size: 0,
58 inverse_edges: DiskMap::default(),
59 location: None,
60 annos: AnnoStorageImpl::new(None)?,
61 stats: None,
62 })
63 }
64
65 fn get_outgoing_edge(&self, node: NodeID) -> Result<Option<NodeID>> {
66 if node > self.max_node_id()? {
67 return Ok(None);
68 }
69 let offset = offset_in_file(node) as usize;
70
71 let view = node_path::View::new(&self.paths[offset..(offset + ENTRY_SIZE)]);
72 if view.length().read() == 0 {
73 Ok(None)
75 } else {
76 let buffer: [u8; 8] = view.nodes()[offset_in_path(0)..offset_in_path(1)].try_into()?;
78 Ok(Some(u64::from_le_bytes(buffer)))
79 }
80 }
81
82 fn max_node_id(&self) -> Result<u64> {
83 let number_of_entries = self.paths_file_size / (ENTRY_SIZE as u64);
84 Ok(number_of_entries - 1)
85 }
86
87 fn path_for_node(&self, node: NodeID) -> Result<Vec<NodeID>> {
88 if node > self.max_node_id()? {
89 return Ok(Vec::default());
90 }
91 let offset = offset_in_file(node) as usize;
92
93 let view = node_path::View::new(&self.paths[offset..(offset + ENTRY_SIZE)]);
94 let length = view.length().read();
95 if length == 0 {
96 Ok(Vec::default())
98 } else {
99 let mut result = Vec::with_capacity(length as usize);
101 for i in 0..length {
102 let i = i as usize;
103 let element_buffer: [u8; 8] =
104 view.nodes()[offset_in_path(i)..offset_in_path(i + 1)].try_into()?;
105 let ancestor_id = u64::from_le_bytes(element_buffer);
106 result.push(ancestor_id);
107 }
108
109 Ok(result)
110 }
111 }
112}
113
114impl EdgeContainer for DiskPathStorage {
115 fn get_outgoing_edges<'a>(
116 &'a self,
117 node: NodeID,
118 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
119 match self.get_outgoing_edge(node) {
120 Ok(Some(n)) => Box::new(std::iter::once(Ok(n))),
121 Ok(None) => Box::new(std::iter::empty()),
122 Err(e) => Box::new(std::iter::once(Err(e))),
123 }
124 }
125
126 fn get_ingoing_edges<'a>(
127 &'a self,
128 node: NodeID,
129 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
130 let lower_bound = Edge {
131 source: node,
132 target: NodeID::MIN,
133 };
134 let upper_bound = Edge {
135 source: node,
136 target: NodeID::MAX,
137 };
138 Box::new(
139 self.inverse_edges
140 .range(lower_bound..upper_bound)
141 .map_ok(|(e, _)| e.target),
142 )
143 }
144
145 fn has_ingoing_edges(&self, node: NodeID) -> Result<bool> {
146 let lower_bound = Edge {
147 source: node,
148 target: NodeID::MIN,
149 };
150 let upper_bound = Edge {
151 source: node,
152 target: NodeID::MAX,
153 };
154
155 if let Some(edge) = self.inverse_edges.range(lower_bound..upper_bound).next() {
156 edge?;
157 Ok(true)
158 } else {
159 Ok(false)
160 }
161 }
162
163 fn source_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
164 let max_node_id = try_as_boxed_iter!(self.max_node_id());
165 let it = (0..=max_node_id)
167 .map(move |n| {
168 let offset = offset_in_file(n) as usize;
169 let view = node_path::View::new(&self.paths[offset..(offset + ENTRY_SIZE)]);
170
171 let path_length = view.length().read();
172 if path_length == 0 {
173 Ok(None)
174 } else {
175 Ok(Some(n))
176 }
177 })
178 .filter_map_ok(|n| n);
179 Box::new(it)
180 }
181
182 fn get_statistics(&self) -> Option<&GraphStatistic> {
183 self.stats.as_ref()
184 }
185}
186
187impl GraphStorage for DiskPathStorage {
188 fn find_connected<'a>(
189 &'a self,
190 node: NodeID,
191 min_distance: usize,
192 max_distance: std::ops::Bound<usize>,
193 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
194 let mut result = Vec::default();
195 if min_distance == 0 {
196 result.push(Ok(node));
197 }
198
199 let path = try_as_boxed_iter!(self.path_for_node(node));
200 let start = min_distance.saturating_sub(1);
202
203 let end = match max_distance {
204 std::ops::Bound::Included(max_distance) => max_distance,
205 std::ops::Bound::Excluded(max_distance) => max_distance.saturating_sub(1),
206 std::ops::Bound::Unbounded => path.len(),
207 };
208 let end = end.min(path.len());
209 if start < end {
210 result.extend(path[start..end].iter().map(|n| Ok(*n)));
211 }
212 Box::new(result.into_iter())
213 }
214
215 fn find_connected_inverse<'a>(
216 &'a self,
217 node: NodeID,
218 min_distance: usize,
219 max_distance: std::ops::Bound<usize>,
220 ) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
221 let mut visited = HashSet::<NodeID>::default();
222 let max_distance = match max_distance {
223 Bound::Unbounded => usize::MAX,
224 Bound::Included(max_distance) => max_distance,
225 Bound::Excluded(max_distance) => max_distance - 1,
226 };
227
228 let it = CycleSafeDFS::<'a>::new_inverse(self, node, min_distance, max_distance)
229 .filter_map_ok(move |x| {
230 if visited.insert(x.node) {
231 Some(x.node)
232 } else {
233 None
234 }
235 });
236 Box::new(it)
237 }
238
239 fn distance(&self, source: NodeID, target: NodeID) -> Result<Option<usize>> {
240 let path = self.path_for_node(source)?;
241 let result = path
243 .into_iter()
244 .position(|n| n == target)
245 .map(|idx| idx + 1);
246 Ok(result)
247 }
248
249 fn is_connected(
250 &self,
251 source: NodeID,
252 target: NodeID,
253 min_distance: usize,
254 max_distance: std::ops::Bound<usize>,
255 ) -> Result<bool> {
256 let path = self.path_for_node(source)?;
257 let start = min_distance.saturating_sub(1).clamp(0, path.len());
259 let end = match max_distance {
260 Bound::Unbounded => path.len(),
261 Bound::Included(max_distance) => max_distance,
262 Bound::Excluded(max_distance) => max_distance.saturating_sub(1),
263 };
264 let end = end.clamp(0, path.len());
265 for p in path.into_iter().take(end).skip(start) {
266 if p == target {
267 return Ok(true);
268 }
269 }
270 Ok(false)
271 }
272
273 fn get_anno_storage(&self) -> &dyn crate::annostorage::EdgeAnnotationStorage {
274 &self.annos
275 }
276
277 fn copy(
278 &mut self,
279 _node_annos: &dyn crate::annostorage::NodeAnnotationStorage,
280 orig: &dyn GraphStorage,
281 ) -> Result<()> {
282 self.inverse_edges.clear();
283
284 let max_node_id = orig
286 .source_nodes()
287 .fold_ok(0, |acc, node_id| acc.max(node_id))?;
288 let file_capacity = (max_node_id + 1) * (ENTRY_SIZE as u64);
289
290 let file = tempfile::tempfile()?;
291 if file_capacity > 0 {
292 file.set_len(file_capacity)?;
293 }
294 let mut mmap = unsafe { MmapMut::map_mut(&file)? };
295
296 for source in orig.source_nodes().sorted_by(|a, b| {
298 let a = a.as_ref().unwrap_or(&0);
299 let b = b.as_ref().unwrap_or(&0);
300 a.cmp(b)
301 }) {
302 let source = source?;
303
304 let offset = offset_in_file(source) as usize;
305 let mut path_view = node_path::View::new(&mut mmap[offset..(offset + ENTRY_SIZE)]);
306 let dfs = CycleSafeDFS::new(orig.as_edgecontainer(), source, 1, MAX_DEPTH);
307 for step in dfs {
308 let step = step?;
309 let target = step.node;
310
311 if step.distance == 1 {
313 let edge = Edge { source, target };
314 self.inverse_edges.insert(edge.inverse(), true)?;
315
316 for a in orig.get_anno_storage().get_annotations_for_item(&edge)? {
318 self.annos.insert(edge.clone(), a)?;
319 }
320 }
321
322 path_view.length_mut().write(step.distance.try_into()?);
324 let offset = offset_in_path(step.distance - 1);
327 let target_node_id_bytes = target.to_le_bytes();
329 path_view.nodes_mut()[offset..(offset + 8)]
330 .copy_from_slice(&target_node_id_bytes[..]);
331 }
332 }
333
334 mmap.flush()?;
335 self.paths = unsafe { Mmap::map(&file)? };
338 self.paths_file_size = file_capacity;
339 self.stats = orig.get_statistics().cloned();
340 self.annos.calculate_statistics()?;
341 Ok(())
342 }
343
344 fn as_edgecontainer(&self) -> &dyn EdgeContainer {
345 self
346 }
347
348 fn serialization_id(&self) -> String {
349 SERIALIZATION_ID.to_string()
350 }
351
352 fn load_from(location: &std::path::Path) -> Result<Self>
353 where
354 Self: std::marker::Sized,
355 {
356 let paths_file = location.join("paths.bin");
358 let paths = File::open(paths_file)?;
359 let paths_file_size = paths.metadata()?.len();
360 let paths = unsafe { Mmap::map(&paths)? };
361
362 let inverse_edges = DiskMap::new(
364 Some(&location.join("inverse_edges.bin")),
365 EvictionStrategy::default(),
366 DEFAULT_BLOCK_CACHE_CAPACITY,
367 BtreeConfig::default()
368 .fixed_key_size(std::mem::size_of::<NodeID>() * 2)
369 .fixed_value_size(2),
370 )?;
371
372 let annos = AnnoStorageImpl::new(Some(
374 location.join(crate::annostorage::ondisk::SUBFOLDER_NAME),
375 ))?;
376
377 let stats = load_statistics_from_location(location)?;
378
379 Ok(Self {
380 paths,
381 paths_file_size,
382 inverse_edges,
383 annos,
384 stats,
385 location: Some(location.to_path_buf()),
386 })
387 }
388
389 fn save_to(&self, location: &std::path::Path) -> Result<()> {
390 std::fs::create_dir_all(location)?;
392 let new_location = location.normalize()?;
394 if let Some(old_location) = &self.location {
395 let old_location = old_location.normalize()?;
396 if new_location == old_location {
397 return Ok(());
400 }
401 }
402 let new_paths_file = new_location.join("paths.bin");
404 let mut new_paths = File::create(new_paths_file)?;
405 let mut old_reader = BufReader::new(&self.paths[..]);
406 std::io::copy(&mut old_reader, &mut new_paths)?;
407
408 self.inverse_edges
410 .write_to(&location.join("inverse_edges.bin"))?;
411
412 self.annos.save_annotations_to(location)?;
414
415 save_statistics_to_toml(location, self.stats.as_ref())?;
416
417 Ok(())
418 }
419}
420
421#[cfg(test)]
422mod tests;