graphannis_core/annostorage/
ondisk.rs

1use crate::annostorage::AnnotationStorage;
2use crate::annostorage::symboltable::SymbolTable;
3use crate::annostorage::{Match, ValueSearch};
4use crate::errors::Result;
5use crate::graph::NODE_NAME_KEY;
6use crate::serializer::{FixedSizeKeySerializer, KeySerializer};
7use crate::types::{AnnoKey, Annotation, Edge, NodeID};
8use crate::util::disk_collections::{DiskMap, EvictionStrategy};
9use crate::{try_as_boxed_iter, util};
10use core::ops::Bound::*;
11use itertools::Itertools;
12use rand::seq::IteratorRandom;
13use regex_syntax::Parser;
14use regex_syntax::hir::literal::Seq;
15use serde_bytes::ByteBuf;
16use std::borrow::Cow;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use transient_btree_index::BtreeConfig;
21
22use super::{EdgeAnnotationStorage, NodeAnnotationStorage};
23
24pub const SUBFOLDER_NAME: &str = "nodes_diskmap_v1";
25
26const EVICTION_STRATEGY: EvictionStrategy = EvictionStrategy::MaximumItems(10_000);
27pub const BLOCK_CACHE_CAPACITY: usize = 32 * crate::util::disk_collections::MB;
28
29/// An on-disk implementation of an annotation storage.
30pub struct AnnoStorageImpl<T>
31where
32    T: FixedSizeKeySerializer
33        + Send
34        + Sync
35        + Clone
36        + serde::ser::Serialize
37        + serde::de::DeserializeOwned,
38{
39    by_container: DiskMap<ByteBuf, String>,
40    by_anno_qname: DiskMap<ByteBuf, bool>,
41    location: PathBuf,
42    /// A handle to a temporary directory. This must be part of the struct because the temporary directory will
43    /// be deleted when this handle is dropped.
44    #[allow(dead_code)]
45    temp_dir: Option<tempfile::TempDir>,
46
47    anno_key_symbols: SymbolTable<AnnoKey>,
48
49    anno_key_sizes: BTreeMap<AnnoKey, usize>,
50
51    /// Sampled histograms for each annotation key .
52    /// Each histogram bound defines a range of values where we estimate that they have the same number of occurences.
53    histogram_bounds: BTreeMap<AnnoKey, Vec<String>>,
54    largest_item: Option<T>,
55
56    phantom: std::marker::PhantomData<T>,
57}
58
59/// Creates a key for the `by_container` tree.
60///
61/// Structure:
62/// ```text
63/// [x Bits item ID][64 Bits symbol ID]
64/// ```
65fn create_by_container_key<T: FixedSizeKeySerializer>(item: T, anno_key_symbol: usize) -> ByteBuf {
66    let mut result: ByteBuf = ByteBuf::from(item.create_key().to_vec());
67    result.extend(anno_key_symbol.create_key());
68    result
69}
70
71/// Creates a key for the `by_anno_qname` tree.
72///
73/// Since the same (name, ns, value) triple can be used by multiple nodes and we want to avoid
74/// arrays as values, the node ID is part of the key and makes it unique.
75///
76/// Structure:
77/// ```text
78/// [64 Bits Annotation Key Symbol][Value]\0[x Bits item ID]
79/// ```
80fn create_by_anno_qname_key<T: FixedSizeKeySerializer>(
81    item: T,
82    anno_key_symbol: usize,
83    anno_value: &str,
84) -> ByteBuf {
85    // Use the qualified annotation name, the value and the node ID as key for the indexes.
86    let mut result: ByteBuf = ByteBuf::from(anno_key_symbol.create_key().to_vec());
87    for b in anno_value.as_bytes() {
88        result.push(*b);
89    }
90    result.push(0);
91    let item_key: &[u8] = &item.create_key();
92    result.extend(item_key);
93    result
94}
95
96impl<T> AnnoStorageImpl<T>
97where
98    T: FixedSizeKeySerializer
99        + Send
100        + Sync
101        + Clone
102        + Default
103        + serde::ser::Serialize
104        + serde::de::DeserializeOwned,
105    (T, Arc<AnnoKey>): Into<Match>,
106{
107    pub fn new(path: Option<PathBuf>) -> Result<AnnoStorageImpl<T>> {
108        if let Some(path) = path {
109            let path_by_container = path.join("by_container.bin");
110            let path_by_anno_qname = path.join("by_anno_qname.bin");
111
112            let mut result = AnnoStorageImpl {
113                by_container: DiskMap::new(
114                    Some(&path_by_container),
115                    EVICTION_STRATEGY,
116                    BLOCK_CACHE_CAPACITY,
117                    BtreeConfig::default().fixed_key_size(T::key_size() + 16),
118                )?,
119                by_anno_qname: DiskMap::new(
120                    Some(&path_by_anno_qname),
121                    EVICTION_STRATEGY,
122                    BLOCK_CACHE_CAPACITY,
123                    BtreeConfig::default(),
124                )?,
125                anno_key_symbols: SymbolTable::default(),
126                anno_key_sizes: BTreeMap::new(),
127                largest_item: None,
128                histogram_bounds: BTreeMap::new(),
129                location: path.clone(),
130                temp_dir: None,
131                phantom: std::marker::PhantomData,
132            };
133
134            // load internal helper fields
135            let custom_path = path.join("custom.bin");
136            let f = std::fs::File::open(custom_path)?;
137            let mut reader = std::io::BufReader::new(f);
138            result.largest_item = bincode::deserialize_from(&mut reader)?;
139            result.anno_key_sizes = bincode::deserialize_from(&mut reader)?;
140            result.histogram_bounds = bincode::deserialize_from(&mut reader)?;
141            result.anno_key_symbols = bincode::deserialize_from(&mut reader)?;
142            result.anno_key_symbols.after_deserialization();
143
144            Ok(result)
145        } else {
146            let tmp_dir = tempfile::Builder::new()
147                .prefix("graphannis-ondisk-nodeanno-")
148                .tempdir()?;
149            Ok(AnnoStorageImpl {
150                by_container: DiskMap::new_temporary(
151                    EVICTION_STRATEGY,
152                    BLOCK_CACHE_CAPACITY,
153                    BtreeConfig::default().fixed_key_size(T::key_size() + 16),
154                ),
155                by_anno_qname: DiskMap::new_temporary(
156                    EVICTION_STRATEGY,
157                    BLOCK_CACHE_CAPACITY,
158                    BtreeConfig::default(),
159                ),
160                anno_key_symbols: SymbolTable::default(),
161                anno_key_sizes: BTreeMap::new(),
162                largest_item: None,
163                histogram_bounds: BTreeMap::new(),
164                location: tmp_dir.as_ref().to_path_buf(),
165                temp_dir: Some(tmp_dir),
166                phantom: std::marker::PhantomData,
167            })
168        }
169    }
170
171    fn matching_items<'a>(
172        &'a self,
173        namespace: Option<&str>,
174        name: &str,
175        value: Option<&str>,
176    ) -> Box<dyn Iterator<Item = Result<(T, Arc<AnnoKey>)>> + 'a>
177    where
178        T: FixedSizeKeySerializer + Send + Sync + PartialOrd,
179    {
180        let key_ranges: Vec<Arc<AnnoKey>> = if let Some(ns) = namespace {
181            vec![Arc::from(AnnoKey {
182                ns: ns.into(),
183                name: name.into(),
184            })]
185        } else {
186            let qnames = match self.get_qnames(name) {
187                Ok(qnames) => qnames,
188                Err(e) => return Box::new(std::iter::once(Err(e))),
189            };
190            qnames.into_iter().map(Arc::from).collect()
191        };
192
193        let value = value.map(|v| v.to_string());
194
195        let it = key_ranges
196            .into_iter()
197            .filter_map(move |k| self.anno_key_symbols.get_symbol(&k))
198            .flat_map(move |anno_key_symbol| {
199                let lower_bound_value = if let Some(value) = &value { value } else { "" };
200                let lower_bound =
201                    create_by_anno_qname_key(NodeID::MIN, anno_key_symbol, lower_bound_value);
202
203                let upper_bound_value = if let Some(value) = &value {
204                    Cow::Borrowed(value)
205                } else {
206                    Cow::Owned(std::char::MAX.to_string())
207                };
208
209                let upper_bound =
210                    create_by_anno_qname_key(NodeID::MAX, anno_key_symbol, &upper_bound_value);
211                self.by_anno_qname.range(lower_bound..upper_bound)
212            })
213            .fuse()
214            .map(move |item| match item {
215                Ok((data, _)) => {
216                    // get the item ID at the end
217                    let item_id = T::parse_key(&data[data.len() - T::key_size()..])?;
218                    let anno_key_symbol = usize::parse_key(&data[0..std::mem::size_of::<usize>()])?;
219                    let key = self
220                        .anno_key_symbols
221                        .get_value(anno_key_symbol)
222                        .unwrap_or_default();
223                    Ok((item_id, key))
224                }
225                Err(e) => Err(e),
226            });
227
228        Box::new(it)
229    }
230
231    fn matching_items_by_prefix<'a>(
232        &'a self,
233        namespace: Option<&str>,
234        name: &str,
235        prefix: String,
236    ) -> Box<dyn Iterator<Item = Result<(T, Arc<AnnoKey>)>> + 'a>
237    where
238        T: FixedSizeKeySerializer + Send + Sync + PartialOrd,
239    {
240        let key_ranges: Vec<Arc<AnnoKey>> = if let Some(ns) = namespace {
241            vec![Arc::from(AnnoKey {
242                ns: ns.into(),
243                name: name.into(),
244            })]
245        } else {
246            let qnames = match self.get_qnames(name) {
247                Ok(qnames) => qnames,
248                Err(e) => return Box::new(std::iter::once(Err(e))),
249            };
250            qnames.into_iter().map(Arc::from).collect()
251        };
252
253        let it = key_ranges
254            .into_iter()
255            .filter_map(move |k| self.anno_key_symbols.get_symbol(&k))
256            .flat_map(move |anno_key_symbol| {
257                let lower_bound = create_by_anno_qname_key(NodeID::MIN, anno_key_symbol, &prefix);
258
259                let mut upper_val = prefix.clone();
260                upper_val.push(std::char::MAX);
261                let upper_bound =
262                    create_by_anno_qname_key(NodeID::MAX, anno_key_symbol, &upper_val);
263                self.by_anno_qname.range(lower_bound..upper_bound)
264            })
265            .fuse()
266            .map(move |item| match item {
267                Ok((data, _)) => {
268                    // get the item ID at the end
269                    let item_id = T::parse_key(&data[data.len() - T::key_size()..])?;
270                    let anno_key_symbol = usize::parse_key(&data[0..std::mem::size_of::<usize>()])?;
271                    let key = self
272                        .anno_key_symbols
273                        .get_value(anno_key_symbol)
274                        .unwrap_or_default();
275                    Ok((item_id, key))
276                }
277                Err(e) => Err(e),
278            });
279
280        Box::new(it)
281    }
282
283    /// Parse the raw data and extract the item ID and the annotation key.
284    fn parse_by_container_key(&self, data: ByteBuf) -> Result<(T, Arc<AnnoKey>)> {
285        let item = T::parse_key(&data[0..T::key_size()])?;
286        let anno_key_symbol = usize::parse_key(&data[T::key_size()..])?;
287
288        let result = (
289            item,
290            self.anno_key_symbols
291                .get_value(anno_key_symbol)
292                .unwrap_or_default(),
293        );
294        Ok(result)
295    }
296
297    /// Parse the raw data and extract the node ID and the annotation.
298    fn parse_by_anno_qname_key(&self, mut data: ByteBuf) -> Result<(T, Arc<AnnoKey>, String)> {
299        // get the item ID at the end
300        let data_len = data.len();
301        let item_id_raw = data.split_off(data_len - T::key_size());
302        let item_id = T::parse_key(&item_id_raw)?;
303
304        // remove the trailing '\0' character
305        data.pop();
306
307        // split off the annotation value string
308        let anno_val_raw = data.split_off(std::mem::size_of::<usize>());
309        let anno_val = String::from_utf8(anno_val_raw)?;
310
311        // parse the remaining annotation key symbol
312        let anno_key_symbol = usize::parse_key(&data)?;
313
314        let result = (
315            item_id,
316            self.anno_key_symbols
317                .get_value(anno_key_symbol)
318                .unwrap_or_default(),
319            anno_val,
320        );
321        Ok(result)
322    }
323
324    fn get_by_anno_qname_range<'a>(
325        &'a self,
326        anno_key: &AnnoKey,
327    ) -> Box<dyn Iterator<Item = Result<(ByteBuf, bool)>> + 'a> {
328        if let Some(anno_key_symbol) = self.anno_key_symbols.get_symbol(anno_key) {
329            let lower_bound = create_by_anno_qname_key(NodeID::MIN, anno_key_symbol, "");
330
331            let upper_bound =
332                create_by_anno_qname_key(NodeID::MAX, anno_key_symbol, &std::char::MAX.to_string());
333
334            Box::new(self.by_anno_qname.range(lower_bound..upper_bound))
335        } else {
336            Box::from(std::iter::empty())
337        }
338    }
339}
340
341impl<T> AnnotationStorage<T> for AnnoStorageImpl<T>
342where
343    T: FixedSizeKeySerializer
344        + Send
345        + Sync
346        + PartialOrd
347        + Clone
348        + Default
349        + serde::ser::Serialize
350        + serde::de::DeserializeOwned,
351    (T, Arc<AnnoKey>): Into<Match>,
352{
353    fn insert(&mut self, item: T, anno: Annotation) -> Result<()> {
354        // make sure the symbol ID for this annotation key is created
355        let anno_key_symbol = self.anno_key_symbols.insert(anno.key.clone())?;
356
357        // insert the value into main tree
358        let by_container_key = create_by_container_key(item.clone(), anno_key_symbol);
359
360        // Check if the item already exists. This needs to access the disk tables,
361        // so avoid the check if we already know the new item is larger than the previous largest
362        // item and thus can't exist yet.
363        let item_smaller_than_largest = self
364            .largest_item
365            .as_ref()
366            .is_none_or(|largest_item| item <= *largest_item);
367        let already_existed =
368            item_smaller_than_largest && self.by_container.contains_key(&by_container_key)?;
369        self.by_container
370            .insert(by_container_key, anno.val.clone())?;
371
372        // To save some space, insert an boolean value as a marker value
373        // (all information is part of the key already)
374        self.by_anno_qname.insert(
375            create_by_anno_qname_key(item.clone(), anno_key_symbol, &anno.val),
376            true,
377        )?;
378
379        if !already_existed {
380            // a new annotation entry was inserted and did not replace an existing one
381            if let Some(largest_item) = self.largest_item.clone() {
382                if largest_item < item {
383                    self.largest_item = Some(item);
384                }
385            } else {
386                self.largest_item = Some(item);
387            }
388
389            let anno_key_entry = self.anno_key_sizes.entry(anno.key).or_insert(0);
390            *anno_key_entry += 1;
391        }
392
393        Ok(())
394    }
395
396    fn get_annotations_for_item(&self, item: &T) -> Result<Vec<Annotation>> {
397        let mut result = Vec::default();
398        let start = create_by_container_key(item.clone(), usize::MIN);
399        let end = create_by_container_key(item.clone(), usize::MAX);
400        for anno in self.by_container.range(start..=end) {
401            let (key, val) = anno?;
402            let parsed_key = self.parse_by_container_key(key)?;
403            let anno = Annotation {
404                key: parsed_key.1.as_ref().clone(),
405                val,
406            };
407            result.push(anno);
408        }
409
410        Ok(result)
411    }
412
413    fn remove_item(&mut self, item: &T) -> Result<bool> {
414        let mut result = false;
415
416        let start = create_by_container_key(item.clone(), usize::MIN);
417        let end = create_by_container_key(item.clone(), usize::MAX);
418
419        let annos_in_container: Result<Vec<_>> = self.by_container.range(start..=end).collect();
420        let annos_in_container = annos_in_container?;
421
422        for (by_container_key, _) in annos_in_container {
423            let symbol_id = usize::parse_key(&by_container_key[T::key_size()..])?;
424            let key = self
425                .anno_key_symbols
426                .get_value(symbol_id)
427                .unwrap_or_default();
428
429            if let Some(val) = self.by_container.remove(&by_container_key)? {
430                // remove annotation from by_anno_qname
431                let anno = Annotation {
432                    key: key.as_ref().clone(),
433                    val,
434                };
435
436                self.by_anno_qname.remove(&create_by_anno_qname_key(
437                    item.clone(),
438                    symbol_id,
439                    &anno.val,
440                ))?;
441                // decrease the annotation count for this key
442                let new_key_count: usize =
443                    if let Some(num_of_keys) = self.anno_key_sizes.get_mut(&key) {
444                        *num_of_keys -= 1;
445                        *num_of_keys
446                    } else {
447                        0
448                    };
449                // if annotation count dropped to zero remove the key
450                if new_key_count == 0 {
451                    self.anno_key_sizes.remove(&key);
452                    if let Some(id) = self.anno_key_symbols.get_symbol(&key) {
453                        self.anno_key_symbols.remove(id);
454                    }
455                }
456
457                result = true;
458            }
459        }
460
461        Ok(result)
462    }
463
464    fn remove_annotation_for_item(
465        &mut self,
466        item: &T,
467        key: &AnnoKey,
468    ) -> Result<Option<Cow<'_, str>>> {
469        // remove annotation from by_container
470        if let Some(symbol_id) = self.anno_key_symbols.get_symbol(key) {
471            let by_container_key = create_by_container_key(item.clone(), symbol_id);
472            if let Some(val) = self.by_container.remove(&by_container_key)? {
473                // remove annotation from by_anno_qname
474                let anno = Annotation {
475                    key: key.clone(),
476                    val,
477                };
478
479                self.by_anno_qname.remove(&create_by_anno_qname_key(
480                    item.clone(),
481                    symbol_id,
482                    &anno.val,
483                ))?;
484                // decrease the annotation count for this key
485                let new_key_count: usize =
486                    if let Some(num_of_keys) = self.anno_key_sizes.get_mut(key) {
487                        *num_of_keys -= 1;
488                        *num_of_keys
489                    } else {
490                        0
491                    };
492                // if annotation count dropped to zero remove the key
493                if new_key_count == 0 {
494                    self.anno_key_sizes.remove(key);
495                    if let Some(id) = self.anno_key_symbols.get_symbol(key) {
496                        self.anno_key_symbols.remove(id);
497                    }
498                }
499
500                return Ok(Some(Cow::Owned(anno.val)));
501            }
502        }
503        Ok(None)
504    }
505
506    fn clear(&mut self) -> Result<()> {
507        self.by_container.clear();
508        self.by_anno_qname.clear();
509
510        self.largest_item = None;
511        self.anno_key_sizes.clear();
512        self.histogram_bounds.clear();
513
514        Ok(())
515    }
516
517    fn get_qnames(&self, name: &str) -> Result<Vec<AnnoKey>> {
518        let it = self.anno_key_sizes.range(
519            AnnoKey {
520                name: name.into(),
521                ns: String::default(),
522            }..,
523        );
524        let mut result: Vec<AnnoKey> = Vec::default();
525        for (k, _) in it {
526            if k.name == name {
527                result.push(k.clone());
528            } else {
529                break;
530            }
531        }
532        Ok(result)
533    }
534
535    fn number_of_annotations(&self) -> Result<usize> {
536        let result = self.by_container.iter()?.count();
537        Ok(result)
538    }
539
540    fn is_empty(&self) -> Result<bool> {
541        self.by_container.is_empty()
542    }
543
544    fn get_value_for_item(&self, item: &T, key: &AnnoKey) -> Result<Option<Cow<'_, str>>> {
545        if let Some(symbol_id) = self.anno_key_symbols.get_symbol(key) {
546            let raw = self
547                .by_container
548                .get(&create_by_container_key(item.clone(), symbol_id))?;
549            if let Some(val) = raw {
550                return match val {
551                    Cow::Borrowed(val) => Ok(Some(Cow::Borrowed(val.as_str()))),
552                    Cow::Owned(val) => Ok(Some(Cow::Owned(val))),
553                };
554            }
555        }
556        Ok(None)
557    }
558
559    fn has_value_for_item(&self, item: &T, key: &AnnoKey) -> Result<bool> {
560        if let Some(symbol_id) = self.anno_key_symbols.get_symbol(key) {
561            let result = self
562                .by_container
563                .contains_key(&create_by_container_key(item.clone(), symbol_id))?;
564            Ok(result)
565        } else {
566            Ok(false)
567        }
568    }
569
570    fn get_keys_for_iterator<'b>(
571        &'b self,
572        ns: Option<&str>,
573        name: Option<&str>,
574        it: Box<
575            dyn Iterator<Item = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>>
576                + 'b,
577        >,
578    ) -> Result<Vec<Match>> {
579        if let Some(name) = name {
580            if let Some(ns) = ns {
581                // return the only possible annotation for each node
582                let key = Arc::from(AnnoKey {
583                    ns: ns.into(),
584                    name: name.into(),
585                });
586                let mut matches = Vec::new();
587                if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
588                    // create a template key
589                    let mut container_key = create_by_container_key(T::default(), symbol_id);
590                    for item in it {
591                        let item = item?;
592                        // Set the first bytes to the ID of the item.
593                        // This saves the repeated expensive construction of the annotation key part.
594                        container_key[0..T::key_size()].copy_from_slice(&item.create_key());
595                        let does_contain_key = self.by_container.contains_key(&container_key)?;
596                        if does_contain_key {
597                            matches.push((item, key.clone()).into());
598                        }
599                    }
600                }
601                Ok(matches)
602            } else {
603                let mut matching_qnames: Vec<(ByteBuf, Arc<AnnoKey>)> = self
604                    .get_qnames(name)?
605                    .into_iter()
606                    .filter_map(|key| {
607                        if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
608                            let serialized_key = create_by_container_key(T::default(), symbol_id);
609                            Some((serialized_key, Arc::from(key)))
610                        } else {
611                            None
612                        }
613                    })
614                    .collect();
615                // return all annotations with the correct name for each node
616                let mut matches = Vec::new();
617                for item in it {
618                    let item = item?;
619                    for (container_key, anno_key) in matching_qnames.iter_mut() {
620                        // Set the first bytes to the ID of the item.
621                        // This saves the repeated expensive construction of the annotation key part.
622                        container_key[0..T::key_size()].copy_from_slice(&item.create_key());
623                        let does_contain_key = self.by_container.contains_key(container_key)?;
624                        if does_contain_key {
625                            matches.push((item.clone(), anno_key.clone()).into());
626                        }
627                    }
628                }
629                Ok(matches)
630            }
631        } else {
632            // get all annotation keys for this item
633            let mut matches = Vec::new();
634            for item in it {
635                let item = item?;
636                let start = create_by_container_key(item.clone(), usize::MIN);
637                let end = create_by_container_key(item, usize::MAX);
638                for anno in self.by_container.range(start..=end) {
639                    let (data, _) = anno?;
640                    let m = self.parse_by_container_key(data)?.into();
641                    matches.push(m);
642                }
643            }
644            Ok(matches)
645        }
646    }
647
648    fn number_of_annotations_by_name(&self, ns: Option<&str>, name: &str) -> Result<usize> {
649        let qualified_keys = match ns {
650            Some(ns) => self.anno_key_sizes.range((
651                Included(AnnoKey {
652                    name: name.into(),
653                    ns: ns.into(),
654                }),
655                Included(AnnoKey {
656                    name: name.into(),
657                    ns: ns.into(),
658                }),
659            )),
660            None => self.anno_key_sizes.range(
661                AnnoKey {
662                    name: name.into(),
663                    ns: String::default(),
664                }..AnnoKey {
665                    name: name.into(),
666                    ns: std::char::MAX.to_string(),
667                },
668            ),
669        };
670        let mut result = 0;
671        for (_anno_key, anno_size) in qualified_keys {
672            result += anno_size;
673        }
674        Ok(result)
675    }
676
677    fn exact_anno_search<'a>(
678        &'a self,
679        namespace: Option<&str>,
680        name: &str,
681        value: ValueSearch<&str>,
682    ) -> Box<dyn Iterator<Item = Result<Match>> + 'a> {
683        match value {
684            ValueSearch::Any => {
685                let it = self
686                    .matching_items(namespace, name, None)
687                    .map_ok(|item| item.into());
688                Box::new(it)
689            }
690            ValueSearch::Some(value) => {
691                let it = self
692                    .matching_items(namespace, name, Some(value))
693                    .map_ok(|item| item.into());
694                Box::new(it)
695            }
696            ValueSearch::NotSome(value) => {
697                let value = value.to_string();
698                let it = self
699                    .matching_items(namespace, name, None)
700                    .map(move |item| match item {
701                        Ok((node, anno_key)) => {
702                            let value = self.get_value_for_item(&node, &anno_key)?;
703                            Ok((node, anno_key, value))
704                        }
705                        Err(e) => Err(e),
706                    })
707                    .filter_map_ok(move |(item, anno_key, item_value)| {
708                        if let Some(item_value) = item_value
709                            && item_value != value
710                        {
711                            return Some((item, anno_key).into());
712                        }
713                        None
714                    });
715                Box::new(it)
716            }
717        }
718    }
719
720    fn regex_anno_search<'a>(
721        &'a self,
722        namespace: Option<&str>,
723        name: &str,
724        pattern: &str,
725        negated: bool,
726    ) -> Box<dyn Iterator<Item = Result<Match>> + 'a> {
727        let full_match_pattern = util::regex_full_match(pattern);
728
729        if let Ok((compiled_regex, parsed_regex)) =
730            util::compile_and_parse_regex(&full_match_pattern)
731        {
732            if negated {
733                let it = self
734                    .matching_items(namespace, name, None)
735                    .map(move |item| match item {
736                        Ok((node, anno_key)) => {
737                            let value = self.get_value_for_item(&node, &anno_key)?;
738                            Ok((node, anno_key, value))
739                        }
740                        Err(e) => Err(e),
741                    })
742                    .filter_ok(move |(_, _, val)| {
743                        if let Some(val) = val {
744                            !compiled_regex.is_match(val)
745                        } else {
746                            false
747                        }
748                    })
749                    .map_ok(move |(node, anno_key, _val)| (node, anno_key).into());
750                Box::new(it)
751            } else {
752                let regex_literal_sequence = regex_syntax::hir::literal::Extractor::new()
753                    .extract(&parsed_regex)
754                    .literals()
755                    .map(Seq::new);
756
757                let prefix_bytes = regex_literal_sequence
758                    .map(|seq| Vec::from(seq.longest_common_prefix().unwrap_or_default()))
759                    .unwrap_or_default();
760                let prefix = try_as_boxed_iter!(std::str::from_utf8(&prefix_bytes));
761
762                let it = self
763                    .matching_items_by_prefix(namespace, name, prefix.to_string())
764                    .map(move |item| match item {
765                        Ok((node, anno_key)) => {
766                            let value = self.get_value_for_item(&node, &anno_key)?;
767                            Ok((node, anno_key, value))
768                        }
769                        Err(e) => Err(e),
770                    })
771                    .filter_ok(move |(_, _, val)| {
772                        if let Some(val) = val {
773                            compiled_regex.is_match(val)
774                        } else {
775                            false
776                        }
777                    })
778                    .map_ok(move |(node, anno_key, _val)| (node, anno_key).into());
779                Box::new(it)
780            }
781        } else if negated {
782            // return all values
783            self.exact_anno_search(namespace, name, None.into())
784        } else {
785            // if regular expression pattern is invalid return empty iterator
786            Box::new(std::iter::empty())
787        }
788    }
789
790    fn get_all_keys_for_item(
791        &self,
792        item: &T,
793        ns: Option<&str>,
794        name: Option<&str>,
795    ) -> Result<Vec<Arc<AnnoKey>>> {
796        if let Some(name) = name {
797            if let Some(ns) = ns {
798                let key = Arc::from(AnnoKey {
799                    ns: ns.into(),
800                    name: name.into(),
801                });
802                if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
803                    let does_contain_key = self
804                        .by_container
805                        .contains_key(&create_by_container_key(item.clone(), symbol_id))?;
806                    if does_contain_key {
807                        return Ok(vec![key.clone()]);
808                    }
809                }
810                Ok(vec![])
811            } else {
812                // get all qualified names for the given annotation name
813                let res: Result<Vec<Arc<AnnoKey>>> = self
814                    .get_qnames(name)?
815                    .into_iter()
816                    .map(|key| {
817                        if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
818                            let does_contain_key = self
819                                .by_container
820                                .contains_key(&create_by_container_key(item.clone(), symbol_id))?;
821                            Ok((does_contain_key, key))
822                        } else {
823                            Ok((false, key))
824                        }
825                    })
826                    .filter_ok(|(does_contain_key, _)| *does_contain_key)
827                    .map_ok(|(_, key)| Arc::from(key))
828                    .collect();
829                let res = res?;
830                Ok(res)
831            }
832        } else {
833            // no annotation name given, return all
834            let result = self
835                .get_annotations_for_item(item)?
836                .into_iter()
837                .map(|anno| Arc::from(anno.key))
838                .collect();
839            Ok(result)
840        }
841    }
842
843    fn guess_max_count(
844        &self,
845        ns: Option<&str>,
846        name: &str,
847        lower_val: &str,
848        upper_val: &str,
849    ) -> Result<usize> {
850        // find all complete keys which have the given name (and namespace if given)
851        let qualified_keys = match ns {
852            Some(ns) => vec![AnnoKey {
853                name: name.into(),
854                ns: ns.into(),
855            }],
856            None => self.get_qnames(name)?,
857        };
858
859        let mut universe_size: usize = 0;
860        let mut sum_histogram_buckets: usize = 0;
861        let mut count_matches: usize = 0;
862
863        // guess for each fully qualified annotation key and return the sum of all guesses
864        for anno_key in qualified_keys {
865            if let Some(anno_size) = self.anno_key_sizes.get(&anno_key) {
866                universe_size += *anno_size;
867
868                if let Some(histo) = self.histogram_bounds.get(&anno_key) {
869                    // find the range in which the value is contained
870
871                    // we need to make sure the histogram is not empty -> should have at least two bounds
872                    if histo.len() >= 2 {
873                        sum_histogram_buckets += histo.len() - 1;
874
875                        for i in 0..histo.len() - 1 {
876                            let bucket_begin = &histo[i];
877                            let bucket_end = &histo[i + 1];
878                            // check if the range overlaps with the search range
879                            if bucket_begin.as_str() <= upper_val
880                                && lower_val <= bucket_end.as_str()
881                            {
882                                count_matches += 1;
883                            }
884                        }
885                    }
886                }
887            }
888        }
889
890        if sum_histogram_buckets > 0 {
891            let selectivity: f64 = (count_matches as f64) / (sum_histogram_buckets as f64);
892            let estimation = (selectivity * (universe_size as f64)).round() as usize;
893            Ok(estimation)
894        } else {
895            Ok(0)
896        }
897    }
898
899    fn guess_max_count_regex(&self, ns: Option<&str>, name: &str, pattern: &str) -> Result<usize> {
900        let full_match_pattern = util::regex_full_match(pattern);
901
902        // Get the total number of annotations with the namespace/name. We
903        // can't get larger than this number
904        let total = self.number_of_annotations_by_name(ns, name)?;
905
906        // Try to parse the regular expression
907        let parsed = Parser::new().parse(&full_match_pattern);
908        if let Ok(parsed) = parsed {
909            let mut guessed_count = 0;
910
911            // Add the guessed count for each prefix
912            if let Some(prefix_set) = regex_syntax::hir::literal::Extractor::new()
913                .extract(&parsed)
914                .literals()
915            {
916                for val_prefix in prefix_set {
917                    let val_prefix = std::str::from_utf8(val_prefix.as_bytes());
918                    if let Ok(lower_val) = val_prefix {
919                        let mut upper_val = String::from(lower_val);
920                        upper_val.push(std::char::MAX);
921                        guessed_count += self.guess_max_count(ns, name, lower_val, &upper_val)?;
922                    }
923                }
924            } else {
925                // For regular expressions without a prefix the worst case would be `.*[X].*` where `[X]` are the most common characters.
926                // Sample values from the histogram to get a better estimation of how many percent of the actual values could match.
927                if let Ok(pattern) = regex::Regex::new(&full_match_pattern) {
928                    let mut rng = rand::rng();
929                    let qualified_keys = match ns {
930                        Some(ns) => vec![AnnoKey {
931                            name: name.into(),
932                            ns: ns.into(),
933                        }],
934                        None => self.get_qnames(name)?,
935                    };
936                    for anno_key in qualified_keys {
937                        let anno_size = self
938                            .anno_key_sizes
939                            .get(&anno_key)
940                            .copied()
941                            .unwrap_or_default();
942                        if let Some(histo) = self.histogram_bounds.get(&anno_key)
943                            && !histo.is_empty()
944                        {
945                            let sampled_values = histo.iter().choose_multiple(&mut rng, 20);
946
947                            let matches = sampled_values
948                                .iter()
949                                .filter(|v| pattern.is_match(v))
950                                .count();
951                            if sampled_values.len() == matches {
952                                // Assume all values match
953                                guessed_count += anno_size;
954                            } else if matches == 0 {
955                                // No match found, but use the bucket size as pessimistic guess
956                                guessed_count +=
957                                    (anno_size as f64 / sampled_values.len() as f64) as usize;
958                            } else {
959                                // Use the percent of matched values to guess the overall number
960                                let match_ratio = (matches as f64) / (sampled_values.len() as f64);
961                                guessed_count += ((anno_size as f64) * match_ratio) as usize;
962                            }
963                        }
964                    }
965                }
966            }
967
968            Ok(guessed_count.min(total))
969        } else {
970            Ok(0)
971        }
972    }
973
974    fn guess_most_frequent_value(
975        &self,
976        ns: Option<&str>,
977        name: &str,
978    ) -> Result<Option<Cow<'_, str>>> {
979        // find all complete keys which have the given name (and namespace if given)
980        let qualified_keys = match ns {
981            Some(ns) => vec![AnnoKey {
982                name: name.into(),
983                ns: ns.into(),
984            }],
985            None => self.get_qnames(name)?,
986        };
987
988        let mut sampled_values: HashMap<&str, usize> = HashMap::default();
989
990        // guess for each fully qualified annotation key
991        for anno_key in qualified_keys {
992            if let Some(histo) = self.histogram_bounds.get(&anno_key) {
993                for v in histo.iter() {
994                    let count: &mut usize = sampled_values.entry(v).or_insert(0);
995                    *count += 1;
996                }
997            }
998        }
999        // find the value which is most frequent
1000        if !sampled_values.is_empty() {
1001            let mut max_count = 0;
1002            let mut max_value = Cow::Borrowed("");
1003            for (v, count) in sampled_values.into_iter() {
1004                if count >= max_count {
1005                    max_value = Cow::Borrowed(v);
1006                    max_count = count;
1007                }
1008            }
1009            Ok(Some(max_value))
1010        } else {
1011            Ok(None)
1012        }
1013    }
1014
1015    fn get_all_values(
1016        &self,
1017        key: &AnnoKey,
1018        most_frequent_first: bool,
1019    ) -> Result<Vec<Cow<'_, str>>> {
1020        if most_frequent_first {
1021            let mut values_with_count: HashMap<String, usize> = HashMap::default();
1022            for item in self.get_by_anno_qname_range(key) {
1023                let (data, _) = item?;
1024                let (_, _, val) = self.parse_by_anno_qname_key(data)?;
1025
1026                let count = values_with_count.entry(val).or_insert(0);
1027                *count += 1;
1028            }
1029            let mut values_with_count: Vec<(usize, Cow<str>)> = values_with_count
1030                .into_iter()
1031                .map(|(val, count)| (count, Cow::Owned(val)))
1032                .collect();
1033            values_with_count.sort();
1034            let result = values_with_count
1035                .into_iter()
1036                .map(|(_count, val)| val)
1037                .collect();
1038            Ok(result)
1039        } else {
1040            let values_unique: Result<HashSet<Cow<str>>> = self
1041                .get_by_anno_qname_range(key)
1042                .map(|item| {
1043                    let (data, _) = item?;
1044                    let (_, _, val) = self.parse_by_anno_qname_key(data)?;
1045                    Ok(Cow::Owned(val))
1046                })
1047                .collect();
1048            Ok(values_unique?.into_iter().collect())
1049        }
1050    }
1051
1052    fn annotation_keys(&self) -> Result<Vec<AnnoKey>> {
1053        Ok(self.anno_key_sizes.keys().cloned().collect())
1054    }
1055
1056    fn get_largest_item(&self) -> Result<Option<T>> {
1057        Ok(self.largest_item.clone())
1058    }
1059
1060    fn calculate_statistics(&mut self) -> Result<()> {
1061        let max_histogram_buckets = 250;
1062        let max_sampled_annotations = 2500;
1063
1064        self.histogram_bounds.clear();
1065
1066        // collect statistics for each annotation key separately
1067        for anno_key in self.anno_key_sizes.keys() {
1068            // sample a maximal number of annotation values
1069            let mut rng = rand::rng();
1070
1071            let all_values_for_key = self.get_by_anno_qname_range(anno_key);
1072
1073            let sampled_anno_values: Result<Vec<String>> = all_values_for_key
1074                .choose_multiple(&mut rng, max_sampled_annotations)
1075                .into_iter()
1076                .map(|data| {
1077                    let (data, _) = data?;
1078                    let (_, _, val) = self.parse_by_anno_qname_key(data)?;
1079                    Ok(val)
1080                })
1081                .collect();
1082            let mut sampled_anno_values = sampled_anno_values?;
1083
1084            // create uniformly distributed histogram bounds
1085            sampled_anno_values.sort();
1086
1087            let num_hist_bounds = if sampled_anno_values.len() < (max_histogram_buckets + 1) {
1088                sampled_anno_values.len()
1089            } else {
1090                max_histogram_buckets + 1
1091            };
1092
1093            let hist = self.histogram_bounds.entry(anno_key.clone()).or_default();
1094
1095            if num_hist_bounds >= 2 {
1096                hist.resize(num_hist_bounds, String::from(""));
1097
1098                let delta: usize = (sampled_anno_values.len() - 1) / (num_hist_bounds - 1);
1099                let delta_fraction: usize = (sampled_anno_values.len() - 1) % (num_hist_bounds - 1);
1100
1101                let mut pos = 0;
1102                let mut pos_fraction = 0;
1103                for hist_item in hist.iter_mut() {
1104                    hist_item.clone_from(&sampled_anno_values[pos]);
1105                    pos += delta;
1106                    pos_fraction += delta_fraction;
1107
1108                    if pos_fraction >= (num_hist_bounds - 1) {
1109                        pos += 1;
1110                        pos_fraction -= num_hist_bounds - 1;
1111                    }
1112                }
1113            }
1114        }
1115        Ok(())
1116    }
1117
1118    fn load_annotations_from(&mut self, location: &Path) -> Result<()> {
1119        let location = location.join(SUBFOLDER_NAME);
1120
1121        if !self.location.eq(&location) {
1122            self.by_container = DiskMap::new(
1123                Some(&location.join("by_container.bin")),
1124                EVICTION_STRATEGY,
1125                BLOCK_CACHE_CAPACITY,
1126                BtreeConfig::default().fixed_value_size(T::key_size() + 9),
1127            )?;
1128            self.by_anno_qname = DiskMap::new(
1129                Some(&location.join("by_anno_qname.bin")),
1130                EVICTION_STRATEGY,
1131                BLOCK_CACHE_CAPACITY,
1132                BtreeConfig::default(),
1133            )?;
1134        }
1135
1136        // load internal helper fields
1137        let f = std::fs::File::open(location.join("custom.bin"))?;
1138        let mut reader = std::io::BufReader::new(f);
1139        self.largest_item = bincode::deserialize_from(&mut reader)?;
1140        self.anno_key_sizes = bincode::deserialize_from(&mut reader)?;
1141        self.histogram_bounds = bincode::deserialize_from(&mut reader)?;
1142        self.anno_key_symbols = bincode::deserialize_from(&mut reader)?;
1143        self.anno_key_symbols.after_deserialization();
1144
1145        Ok(())
1146    }
1147
1148    fn save_annotations_to(&self, location: &Path) -> Result<()> {
1149        let location = location.join(SUBFOLDER_NAME);
1150
1151        // write out the disk maps to a single sorted string table
1152        self.by_container
1153            .write_to(&location.join("by_container.bin"))?;
1154        self.by_anno_qname
1155            .write_to(&location.join("by_anno_qname.bin"))?;
1156
1157        // save the other custom fields
1158        let f = std::fs::File::create(location.join("custom.bin"))?;
1159        let mut writer = std::io::BufWriter::new(f);
1160        bincode::serialize_into(&mut writer, &self.largest_item)?;
1161        bincode::serialize_into(&mut writer, &self.anno_key_sizes)?;
1162        bincode::serialize_into(&mut writer, &self.histogram_bounds)?;
1163        bincode::serialize_into(&mut writer, &self.anno_key_symbols)?;
1164
1165        Ok(())
1166    }
1167}
1168
1169impl NodeAnnotationStorage for AnnoStorageImpl<NodeID> {
1170    fn get_node_id_from_name(&self, node_name: &str) -> Result<Option<NodeID>> {
1171        if let Some(node_name_symbol) = self.anno_key_symbols.get_symbol(&NODE_NAME_KEY) {
1172            let lower_bound = create_by_anno_qname_key(NodeID::MIN, node_name_symbol, node_name);
1173            let upper_bound = create_by_anno_qname_key(NodeID::MAX, node_name_symbol, node_name);
1174
1175            let mut results = self.by_anno_qname.range(lower_bound..=upper_bound);
1176            if let Some(item) = results.next() {
1177                let (data, _) = item?;
1178                let item_id = NodeID::parse_key(&data[data.len() - NodeID::key_size()..])?;
1179                return Ok(Some(item_id));
1180            }
1181        }
1182        Ok(None)
1183    }
1184
1185    fn has_node_name(&self, node_name: &str) -> Result<bool> {
1186        if let Some(node_name_symbol) = self.anno_key_symbols.get_symbol(&NODE_NAME_KEY) {
1187            let lower_bound = create_by_anno_qname_key(NodeID::MIN, node_name_symbol, node_name);
1188            let upper_bound = create_by_anno_qname_key(NodeID::MAX, node_name_symbol, node_name);
1189
1190            let mut results = self.by_anno_qname.range(lower_bound..=upper_bound);
1191            return Ok(results.next().is_some());
1192        }
1193        Ok(false)
1194    }
1195}
1196
1197impl EdgeAnnotationStorage for AnnoStorageImpl<Edge> {}
1198
1199#[cfg(test)]
1200mod tests;