1use crate::annostorage::AnnotationStorage;
2use crate::annostorage::symboltable::SymbolTable;
3use crate::annostorage::{Match, ValueSearch};
4use crate::errors::Result;
5use crate::graph::NODE_NAME_KEY;
6use crate::serializer::{FixedSizeKeySerializer, KeySerializer};
7use crate::types::{AnnoKey, Annotation, Edge, NodeID};
8use crate::util::disk_collections::{DiskMap, EvictionStrategy};
9use crate::{try_as_boxed_iter, util};
10use core::ops::Bound::*;
11use itertools::Itertools;
12use rand::seq::IteratorRandom;
13use regex_syntax::Parser;
14use regex_syntax::hir::literal::Seq;
15use serde_bytes::ByteBuf;
16use std::borrow::Cow;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use transient_btree_index::BtreeConfig;
21
22use super::{EdgeAnnotationStorage, NodeAnnotationStorage};
23
24pub const SUBFOLDER_NAME: &str = "nodes_diskmap_v1";
25
26const EVICTION_STRATEGY: EvictionStrategy = EvictionStrategy::MaximumItems(10_000);
27pub const BLOCK_CACHE_CAPACITY: usize = 32 * crate::util::disk_collections::MB;
28
29pub struct AnnoStorageImpl<T>
31where
32 T: FixedSizeKeySerializer
33 + Send
34 + Sync
35 + Clone
36 + serde::ser::Serialize
37 + serde::de::DeserializeOwned,
38{
39 by_container: DiskMap<ByteBuf, String>,
40 by_anno_qname: DiskMap<ByteBuf, bool>,
41 location: PathBuf,
42 #[allow(dead_code)]
45 temp_dir: Option<tempfile::TempDir>,
46
47 anno_key_symbols: SymbolTable<AnnoKey>,
48
49 anno_key_sizes: BTreeMap<AnnoKey, usize>,
50
51 histogram_bounds: BTreeMap<AnnoKey, Vec<String>>,
54 largest_item: Option<T>,
55
56 phantom: std::marker::PhantomData<T>,
57}
58
59fn create_by_container_key<T: FixedSizeKeySerializer>(item: T, anno_key_symbol: usize) -> ByteBuf {
66 let mut result: ByteBuf = ByteBuf::from(item.create_key().to_vec());
67 result.extend(anno_key_symbol.create_key());
68 result
69}
70
71fn create_by_anno_qname_key<T: FixedSizeKeySerializer>(
81 item: T,
82 anno_key_symbol: usize,
83 anno_value: &str,
84) -> ByteBuf {
85 let mut result: ByteBuf = ByteBuf::from(anno_key_symbol.create_key().to_vec());
87 for b in anno_value.as_bytes() {
88 result.push(*b);
89 }
90 result.push(0);
91 let item_key: &[u8] = &item.create_key();
92 result.extend(item_key);
93 result
94}
95
96impl<T> AnnoStorageImpl<T>
97where
98 T: FixedSizeKeySerializer
99 + Send
100 + Sync
101 + Clone
102 + Default
103 + serde::ser::Serialize
104 + serde::de::DeserializeOwned,
105 (T, Arc<AnnoKey>): Into<Match>,
106{
107 pub fn new(path: Option<PathBuf>) -> Result<AnnoStorageImpl<T>> {
108 if let Some(path) = path {
109 let path_by_container = path.join("by_container.bin");
110 let path_by_anno_qname = path.join("by_anno_qname.bin");
111
112 let mut result = AnnoStorageImpl {
113 by_container: DiskMap::new(
114 Some(&path_by_container),
115 EVICTION_STRATEGY,
116 BLOCK_CACHE_CAPACITY,
117 BtreeConfig::default().fixed_key_size(T::key_size() + 16),
118 )?,
119 by_anno_qname: DiskMap::new(
120 Some(&path_by_anno_qname),
121 EVICTION_STRATEGY,
122 BLOCK_CACHE_CAPACITY,
123 BtreeConfig::default(),
124 )?,
125 anno_key_symbols: SymbolTable::default(),
126 anno_key_sizes: BTreeMap::new(),
127 largest_item: None,
128 histogram_bounds: BTreeMap::new(),
129 location: path.clone(),
130 temp_dir: None,
131 phantom: std::marker::PhantomData,
132 };
133
134 let custom_path = path.join("custom.bin");
136 let f = std::fs::File::open(custom_path)?;
137 let mut reader = std::io::BufReader::new(f);
138 result.largest_item = bincode::deserialize_from(&mut reader)?;
139 result.anno_key_sizes = bincode::deserialize_from(&mut reader)?;
140 result.histogram_bounds = bincode::deserialize_from(&mut reader)?;
141 result.anno_key_symbols = bincode::deserialize_from(&mut reader)?;
142 result.anno_key_symbols.after_deserialization();
143
144 Ok(result)
145 } else {
146 let tmp_dir = tempfile::Builder::new()
147 .prefix("graphannis-ondisk-nodeanno-")
148 .tempdir()?;
149 Ok(AnnoStorageImpl {
150 by_container: DiskMap::new_temporary(
151 EVICTION_STRATEGY,
152 BLOCK_CACHE_CAPACITY,
153 BtreeConfig::default().fixed_key_size(T::key_size() + 16),
154 ),
155 by_anno_qname: DiskMap::new_temporary(
156 EVICTION_STRATEGY,
157 BLOCK_CACHE_CAPACITY,
158 BtreeConfig::default(),
159 ),
160 anno_key_symbols: SymbolTable::default(),
161 anno_key_sizes: BTreeMap::new(),
162 largest_item: None,
163 histogram_bounds: BTreeMap::new(),
164 location: tmp_dir.as_ref().to_path_buf(),
165 temp_dir: Some(tmp_dir),
166 phantom: std::marker::PhantomData,
167 })
168 }
169 }
170
171 fn matching_items<'a>(
172 &'a self,
173 namespace: Option<&str>,
174 name: &str,
175 value: Option<&str>,
176 ) -> Box<dyn Iterator<Item = Result<(T, Arc<AnnoKey>)>> + 'a>
177 where
178 T: FixedSizeKeySerializer + Send + Sync + PartialOrd,
179 {
180 let key_ranges: Vec<Arc<AnnoKey>> = if let Some(ns) = namespace {
181 vec![Arc::from(AnnoKey {
182 ns: ns.into(),
183 name: name.into(),
184 })]
185 } else {
186 let qnames = match self.get_qnames(name) {
187 Ok(qnames) => qnames,
188 Err(e) => return Box::new(std::iter::once(Err(e))),
189 };
190 qnames.into_iter().map(Arc::from).collect()
191 };
192
193 let value = value.map(|v| v.to_string());
194
195 let it = key_ranges
196 .into_iter()
197 .filter_map(move |k| self.anno_key_symbols.get_symbol(&k))
198 .flat_map(move |anno_key_symbol| {
199 let lower_bound_value = if let Some(value) = &value { value } else { "" };
200 let lower_bound =
201 create_by_anno_qname_key(NodeID::MIN, anno_key_symbol, lower_bound_value);
202
203 let upper_bound_value = if let Some(value) = &value {
204 Cow::Borrowed(value)
205 } else {
206 Cow::Owned(std::char::MAX.to_string())
207 };
208
209 let upper_bound =
210 create_by_anno_qname_key(NodeID::MAX, anno_key_symbol, &upper_bound_value);
211 self.by_anno_qname.range(lower_bound..upper_bound)
212 })
213 .fuse()
214 .map(move |item| match item {
215 Ok((data, _)) => {
216 let item_id = T::parse_key(&data[data.len() - T::key_size()..])?;
218 let anno_key_symbol = usize::parse_key(&data[0..std::mem::size_of::<usize>()])?;
219 let key = self
220 .anno_key_symbols
221 .get_value(anno_key_symbol)
222 .unwrap_or_default();
223 Ok((item_id, key))
224 }
225 Err(e) => Err(e),
226 });
227
228 Box::new(it)
229 }
230
231 fn matching_items_by_prefix<'a>(
232 &'a self,
233 namespace: Option<&str>,
234 name: &str,
235 prefix: String,
236 ) -> Box<dyn Iterator<Item = Result<(T, Arc<AnnoKey>)>> + 'a>
237 where
238 T: FixedSizeKeySerializer + Send + Sync + PartialOrd,
239 {
240 let key_ranges: Vec<Arc<AnnoKey>> = if let Some(ns) = namespace {
241 vec![Arc::from(AnnoKey {
242 ns: ns.into(),
243 name: name.into(),
244 })]
245 } else {
246 let qnames = match self.get_qnames(name) {
247 Ok(qnames) => qnames,
248 Err(e) => return Box::new(std::iter::once(Err(e))),
249 };
250 qnames.into_iter().map(Arc::from).collect()
251 };
252
253 let it = key_ranges
254 .into_iter()
255 .filter_map(move |k| self.anno_key_symbols.get_symbol(&k))
256 .flat_map(move |anno_key_symbol| {
257 let lower_bound = create_by_anno_qname_key(NodeID::MIN, anno_key_symbol, &prefix);
258
259 let mut upper_val = prefix.clone();
260 upper_val.push(std::char::MAX);
261 let upper_bound =
262 create_by_anno_qname_key(NodeID::MAX, anno_key_symbol, &upper_val);
263 self.by_anno_qname.range(lower_bound..upper_bound)
264 })
265 .fuse()
266 .map(move |item| match item {
267 Ok((data, _)) => {
268 let item_id = T::parse_key(&data[data.len() - T::key_size()..])?;
270 let anno_key_symbol = usize::parse_key(&data[0..std::mem::size_of::<usize>()])?;
271 let key = self
272 .anno_key_symbols
273 .get_value(anno_key_symbol)
274 .unwrap_or_default();
275 Ok((item_id, key))
276 }
277 Err(e) => Err(e),
278 });
279
280 Box::new(it)
281 }
282
283 fn parse_by_container_key(&self, data: ByteBuf) -> Result<(T, Arc<AnnoKey>)> {
285 let item = T::parse_key(&data[0..T::key_size()])?;
286 let anno_key_symbol = usize::parse_key(&data[T::key_size()..])?;
287
288 let result = (
289 item,
290 self.anno_key_symbols
291 .get_value(anno_key_symbol)
292 .unwrap_or_default(),
293 );
294 Ok(result)
295 }
296
297 fn parse_by_anno_qname_key(&self, mut data: ByteBuf) -> Result<(T, Arc<AnnoKey>, String)> {
299 let data_len = data.len();
301 let item_id_raw = data.split_off(data_len - T::key_size());
302 let item_id = T::parse_key(&item_id_raw)?;
303
304 data.pop();
306
307 let anno_val_raw = data.split_off(std::mem::size_of::<usize>());
309 let anno_val = String::from_utf8(anno_val_raw)?;
310
311 let anno_key_symbol = usize::parse_key(&data)?;
313
314 let result = (
315 item_id,
316 self.anno_key_symbols
317 .get_value(anno_key_symbol)
318 .unwrap_or_default(),
319 anno_val,
320 );
321 Ok(result)
322 }
323
324 fn get_by_anno_qname_range<'a>(
325 &'a self,
326 anno_key: &AnnoKey,
327 ) -> Box<dyn Iterator<Item = Result<(ByteBuf, bool)>> + 'a> {
328 if let Some(anno_key_symbol) = self.anno_key_symbols.get_symbol(anno_key) {
329 let lower_bound = create_by_anno_qname_key(NodeID::MIN, anno_key_symbol, "");
330
331 let upper_bound =
332 create_by_anno_qname_key(NodeID::MAX, anno_key_symbol, &std::char::MAX.to_string());
333
334 Box::new(self.by_anno_qname.range(lower_bound..upper_bound))
335 } else {
336 Box::from(std::iter::empty())
337 }
338 }
339}
340
341impl<T> AnnotationStorage<T> for AnnoStorageImpl<T>
342where
343 T: FixedSizeKeySerializer
344 + Send
345 + Sync
346 + PartialOrd
347 + Clone
348 + Default
349 + serde::ser::Serialize
350 + serde::de::DeserializeOwned,
351 (T, Arc<AnnoKey>): Into<Match>,
352{
353 fn insert(&mut self, item: T, anno: Annotation) -> Result<()> {
354 let anno_key_symbol = self.anno_key_symbols.insert(anno.key.clone())?;
356
357 let by_container_key = create_by_container_key(item.clone(), anno_key_symbol);
359
360 let item_smaller_than_largest = self
364 .largest_item
365 .as_ref()
366 .is_none_or(|largest_item| item <= *largest_item);
367 let already_existed =
368 item_smaller_than_largest && self.by_container.contains_key(&by_container_key)?;
369 self.by_container
370 .insert(by_container_key, anno.val.clone())?;
371
372 self.by_anno_qname.insert(
375 create_by_anno_qname_key(item.clone(), anno_key_symbol, &anno.val),
376 true,
377 )?;
378
379 if !already_existed {
380 if let Some(largest_item) = self.largest_item.clone() {
382 if largest_item < item {
383 self.largest_item = Some(item);
384 }
385 } else {
386 self.largest_item = Some(item);
387 }
388
389 let anno_key_entry = self.anno_key_sizes.entry(anno.key).or_insert(0);
390 *anno_key_entry += 1;
391 }
392
393 Ok(())
394 }
395
396 fn get_annotations_for_item(&self, item: &T) -> Result<Vec<Annotation>> {
397 let mut result = Vec::default();
398 let start = create_by_container_key(item.clone(), usize::MIN);
399 let end = create_by_container_key(item.clone(), usize::MAX);
400 for anno in self.by_container.range(start..=end) {
401 let (key, val) = anno?;
402 let parsed_key = self.parse_by_container_key(key)?;
403 let anno = Annotation {
404 key: parsed_key.1.as_ref().clone(),
405 val,
406 };
407 result.push(anno);
408 }
409
410 Ok(result)
411 }
412
413 fn remove_item(&mut self, item: &T) -> Result<bool> {
414 let mut result = false;
415
416 let start = create_by_container_key(item.clone(), usize::MIN);
417 let end = create_by_container_key(item.clone(), usize::MAX);
418
419 let annos_in_container: Result<Vec<_>> = self.by_container.range(start..=end).collect();
420 let annos_in_container = annos_in_container?;
421
422 for (by_container_key, _) in annos_in_container {
423 let symbol_id = usize::parse_key(&by_container_key[T::key_size()..])?;
424 let key = self
425 .anno_key_symbols
426 .get_value(symbol_id)
427 .unwrap_or_default();
428
429 if let Some(val) = self.by_container.remove(&by_container_key)? {
430 let anno = Annotation {
432 key: key.as_ref().clone(),
433 val,
434 };
435
436 self.by_anno_qname.remove(&create_by_anno_qname_key(
437 item.clone(),
438 symbol_id,
439 &anno.val,
440 ))?;
441 let new_key_count: usize =
443 if let Some(num_of_keys) = self.anno_key_sizes.get_mut(&key) {
444 *num_of_keys -= 1;
445 *num_of_keys
446 } else {
447 0
448 };
449 if new_key_count == 0 {
451 self.anno_key_sizes.remove(&key);
452 if let Some(id) = self.anno_key_symbols.get_symbol(&key) {
453 self.anno_key_symbols.remove(id);
454 }
455 }
456
457 result = true;
458 }
459 }
460
461 Ok(result)
462 }
463
464 fn remove_annotation_for_item(
465 &mut self,
466 item: &T,
467 key: &AnnoKey,
468 ) -> Result<Option<Cow<'_, str>>> {
469 if let Some(symbol_id) = self.anno_key_symbols.get_symbol(key) {
471 let by_container_key = create_by_container_key(item.clone(), symbol_id);
472 if let Some(val) = self.by_container.remove(&by_container_key)? {
473 let anno = Annotation {
475 key: key.clone(),
476 val,
477 };
478
479 self.by_anno_qname.remove(&create_by_anno_qname_key(
480 item.clone(),
481 symbol_id,
482 &anno.val,
483 ))?;
484 let new_key_count: usize =
486 if let Some(num_of_keys) = self.anno_key_sizes.get_mut(key) {
487 *num_of_keys -= 1;
488 *num_of_keys
489 } else {
490 0
491 };
492 if new_key_count == 0 {
494 self.anno_key_sizes.remove(key);
495 if let Some(id) = self.anno_key_symbols.get_symbol(key) {
496 self.anno_key_symbols.remove(id);
497 }
498 }
499
500 return Ok(Some(Cow::Owned(anno.val)));
501 }
502 }
503 Ok(None)
504 }
505
506 fn clear(&mut self) -> Result<()> {
507 self.by_container.clear();
508 self.by_anno_qname.clear();
509
510 self.largest_item = None;
511 self.anno_key_sizes.clear();
512 self.histogram_bounds.clear();
513
514 Ok(())
515 }
516
517 fn get_qnames(&self, name: &str) -> Result<Vec<AnnoKey>> {
518 let it = self.anno_key_sizes.range(
519 AnnoKey {
520 name: name.into(),
521 ns: String::default(),
522 }..,
523 );
524 let mut result: Vec<AnnoKey> = Vec::default();
525 for (k, _) in it {
526 if k.name == name {
527 result.push(k.clone());
528 } else {
529 break;
530 }
531 }
532 Ok(result)
533 }
534
535 fn number_of_annotations(&self) -> Result<usize> {
536 let result = self.by_container.iter()?.count();
537 Ok(result)
538 }
539
540 fn is_empty(&self) -> Result<bool> {
541 self.by_container.is_empty()
542 }
543
544 fn get_value_for_item(&self, item: &T, key: &AnnoKey) -> Result<Option<Cow<'_, str>>> {
545 if let Some(symbol_id) = self.anno_key_symbols.get_symbol(key) {
546 let raw = self
547 .by_container
548 .get(&create_by_container_key(item.clone(), symbol_id))?;
549 if let Some(val) = raw {
550 return match val {
551 Cow::Borrowed(val) => Ok(Some(Cow::Borrowed(val.as_str()))),
552 Cow::Owned(val) => Ok(Some(Cow::Owned(val))),
553 };
554 }
555 }
556 Ok(None)
557 }
558
559 fn has_value_for_item(&self, item: &T, key: &AnnoKey) -> Result<bool> {
560 if let Some(symbol_id) = self.anno_key_symbols.get_symbol(key) {
561 let result = self
562 .by_container
563 .contains_key(&create_by_container_key(item.clone(), symbol_id))?;
564 Ok(result)
565 } else {
566 Ok(false)
567 }
568 }
569
570 fn get_keys_for_iterator<'b>(
571 &'b self,
572 ns: Option<&str>,
573 name: Option<&str>,
574 it: Box<
575 dyn Iterator<Item = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>>
576 + 'b,
577 >,
578 ) -> Result<Vec<Match>> {
579 if let Some(name) = name {
580 if let Some(ns) = ns {
581 let key = Arc::from(AnnoKey {
583 ns: ns.into(),
584 name: name.into(),
585 });
586 let mut matches = Vec::new();
587 if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
588 let mut container_key = create_by_container_key(T::default(), symbol_id);
590 for item in it {
591 let item = item?;
592 container_key[0..T::key_size()].copy_from_slice(&item.create_key());
595 let does_contain_key = self.by_container.contains_key(&container_key)?;
596 if does_contain_key {
597 matches.push((item, key.clone()).into());
598 }
599 }
600 }
601 Ok(matches)
602 } else {
603 let mut matching_qnames: Vec<(ByteBuf, Arc<AnnoKey>)> = self
604 .get_qnames(name)?
605 .into_iter()
606 .filter_map(|key| {
607 if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
608 let serialized_key = create_by_container_key(T::default(), symbol_id);
609 Some((serialized_key, Arc::from(key)))
610 } else {
611 None
612 }
613 })
614 .collect();
615 let mut matches = Vec::new();
617 for item in it {
618 let item = item?;
619 for (container_key, anno_key) in matching_qnames.iter_mut() {
620 container_key[0..T::key_size()].copy_from_slice(&item.create_key());
623 let does_contain_key = self.by_container.contains_key(container_key)?;
624 if does_contain_key {
625 matches.push((item.clone(), anno_key.clone()).into());
626 }
627 }
628 }
629 Ok(matches)
630 }
631 } else {
632 let mut matches = Vec::new();
634 for item in it {
635 let item = item?;
636 let start = create_by_container_key(item.clone(), usize::MIN);
637 let end = create_by_container_key(item, usize::MAX);
638 for anno in self.by_container.range(start..=end) {
639 let (data, _) = anno?;
640 let m = self.parse_by_container_key(data)?.into();
641 matches.push(m);
642 }
643 }
644 Ok(matches)
645 }
646 }
647
648 fn number_of_annotations_by_name(&self, ns: Option<&str>, name: &str) -> Result<usize> {
649 let qualified_keys = match ns {
650 Some(ns) => self.anno_key_sizes.range((
651 Included(AnnoKey {
652 name: name.into(),
653 ns: ns.into(),
654 }),
655 Included(AnnoKey {
656 name: name.into(),
657 ns: ns.into(),
658 }),
659 )),
660 None => self.anno_key_sizes.range(
661 AnnoKey {
662 name: name.into(),
663 ns: String::default(),
664 }..AnnoKey {
665 name: name.into(),
666 ns: std::char::MAX.to_string(),
667 },
668 ),
669 };
670 let mut result = 0;
671 for (_anno_key, anno_size) in qualified_keys {
672 result += anno_size;
673 }
674 Ok(result)
675 }
676
677 fn exact_anno_search<'a>(
678 &'a self,
679 namespace: Option<&str>,
680 name: &str,
681 value: ValueSearch<&str>,
682 ) -> Box<dyn Iterator<Item = Result<Match>> + 'a> {
683 match value {
684 ValueSearch::Any => {
685 let it = self
686 .matching_items(namespace, name, None)
687 .map_ok(|item| item.into());
688 Box::new(it)
689 }
690 ValueSearch::Some(value) => {
691 let it = self
692 .matching_items(namespace, name, Some(value))
693 .map_ok(|item| item.into());
694 Box::new(it)
695 }
696 ValueSearch::NotSome(value) => {
697 let value = value.to_string();
698 let it = self
699 .matching_items(namespace, name, None)
700 .map(move |item| match item {
701 Ok((node, anno_key)) => {
702 let value = self.get_value_for_item(&node, &anno_key)?;
703 Ok((node, anno_key, value))
704 }
705 Err(e) => Err(e),
706 })
707 .filter_map_ok(move |(item, anno_key, item_value)| {
708 if let Some(item_value) = item_value
709 && item_value != value
710 {
711 return Some((item, anno_key).into());
712 }
713 None
714 });
715 Box::new(it)
716 }
717 }
718 }
719
720 fn regex_anno_search<'a>(
721 &'a self,
722 namespace: Option<&str>,
723 name: &str,
724 pattern: &str,
725 negated: bool,
726 ) -> Box<dyn Iterator<Item = Result<Match>> + 'a> {
727 let full_match_pattern = util::regex_full_match(pattern);
728
729 if let Ok((compiled_regex, parsed_regex)) =
730 util::compile_and_parse_regex(&full_match_pattern)
731 {
732 if negated {
733 let it = self
734 .matching_items(namespace, name, None)
735 .map(move |item| match item {
736 Ok((node, anno_key)) => {
737 let value = self.get_value_for_item(&node, &anno_key)?;
738 Ok((node, anno_key, value))
739 }
740 Err(e) => Err(e),
741 })
742 .filter_ok(move |(_, _, val)| {
743 if let Some(val) = val {
744 !compiled_regex.is_match(val)
745 } else {
746 false
747 }
748 })
749 .map_ok(move |(node, anno_key, _val)| (node, anno_key).into());
750 Box::new(it)
751 } else {
752 let regex_literal_sequence = regex_syntax::hir::literal::Extractor::new()
753 .extract(&parsed_regex)
754 .literals()
755 .map(Seq::new);
756
757 let prefix_bytes = regex_literal_sequence
758 .map(|seq| Vec::from(seq.longest_common_prefix().unwrap_or_default()))
759 .unwrap_or_default();
760 let prefix = try_as_boxed_iter!(std::str::from_utf8(&prefix_bytes));
761
762 let it = self
763 .matching_items_by_prefix(namespace, name, prefix.to_string())
764 .map(move |item| match item {
765 Ok((node, anno_key)) => {
766 let value = self.get_value_for_item(&node, &anno_key)?;
767 Ok((node, anno_key, value))
768 }
769 Err(e) => Err(e),
770 })
771 .filter_ok(move |(_, _, val)| {
772 if let Some(val) = val {
773 compiled_regex.is_match(val)
774 } else {
775 false
776 }
777 })
778 .map_ok(move |(node, anno_key, _val)| (node, anno_key).into());
779 Box::new(it)
780 }
781 } else if negated {
782 self.exact_anno_search(namespace, name, None.into())
784 } else {
785 Box::new(std::iter::empty())
787 }
788 }
789
790 fn get_all_keys_for_item(
791 &self,
792 item: &T,
793 ns: Option<&str>,
794 name: Option<&str>,
795 ) -> Result<Vec<Arc<AnnoKey>>> {
796 if let Some(name) = name {
797 if let Some(ns) = ns {
798 let key = Arc::from(AnnoKey {
799 ns: ns.into(),
800 name: name.into(),
801 });
802 if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
803 let does_contain_key = self
804 .by_container
805 .contains_key(&create_by_container_key(item.clone(), symbol_id))?;
806 if does_contain_key {
807 return Ok(vec![key.clone()]);
808 }
809 }
810 Ok(vec![])
811 } else {
812 let res: Result<Vec<Arc<AnnoKey>>> = self
814 .get_qnames(name)?
815 .into_iter()
816 .map(|key| {
817 if let Some(symbol_id) = self.anno_key_symbols.get_symbol(&key) {
818 let does_contain_key = self
819 .by_container
820 .contains_key(&create_by_container_key(item.clone(), symbol_id))?;
821 Ok((does_contain_key, key))
822 } else {
823 Ok((false, key))
824 }
825 })
826 .filter_ok(|(does_contain_key, _)| *does_contain_key)
827 .map_ok(|(_, key)| Arc::from(key))
828 .collect();
829 let res = res?;
830 Ok(res)
831 }
832 } else {
833 let result = self
835 .get_annotations_for_item(item)?
836 .into_iter()
837 .map(|anno| Arc::from(anno.key))
838 .collect();
839 Ok(result)
840 }
841 }
842
843 fn guess_max_count(
844 &self,
845 ns: Option<&str>,
846 name: &str,
847 lower_val: &str,
848 upper_val: &str,
849 ) -> Result<usize> {
850 let qualified_keys = match ns {
852 Some(ns) => vec![AnnoKey {
853 name: name.into(),
854 ns: ns.into(),
855 }],
856 None => self.get_qnames(name)?,
857 };
858
859 let mut universe_size: usize = 0;
860 let mut sum_histogram_buckets: usize = 0;
861 let mut count_matches: usize = 0;
862
863 for anno_key in qualified_keys {
865 if let Some(anno_size) = self.anno_key_sizes.get(&anno_key) {
866 universe_size += *anno_size;
867
868 if let Some(histo) = self.histogram_bounds.get(&anno_key) {
869 if histo.len() >= 2 {
873 sum_histogram_buckets += histo.len() - 1;
874
875 for i in 0..histo.len() - 1 {
876 let bucket_begin = &histo[i];
877 let bucket_end = &histo[i + 1];
878 if bucket_begin.as_str() <= upper_val
880 && lower_val <= bucket_end.as_str()
881 {
882 count_matches += 1;
883 }
884 }
885 }
886 }
887 }
888 }
889
890 if sum_histogram_buckets > 0 {
891 let selectivity: f64 = (count_matches as f64) / (sum_histogram_buckets as f64);
892 let estimation = (selectivity * (universe_size as f64)).round() as usize;
893 Ok(estimation)
894 } else {
895 Ok(0)
896 }
897 }
898
899 fn guess_max_count_regex(&self, ns: Option<&str>, name: &str, pattern: &str) -> Result<usize> {
900 let full_match_pattern = util::regex_full_match(pattern);
901
902 let total = self.number_of_annotations_by_name(ns, name)?;
905
906 let parsed = Parser::new().parse(&full_match_pattern);
908 if let Ok(parsed) = parsed {
909 let mut guessed_count = 0;
910
911 if let Some(prefix_set) = regex_syntax::hir::literal::Extractor::new()
913 .extract(&parsed)
914 .literals()
915 {
916 for val_prefix in prefix_set {
917 let val_prefix = std::str::from_utf8(val_prefix.as_bytes());
918 if let Ok(lower_val) = val_prefix {
919 let mut upper_val = String::from(lower_val);
920 upper_val.push(std::char::MAX);
921 guessed_count += self.guess_max_count(ns, name, lower_val, &upper_val)?;
922 }
923 }
924 } else {
925 if let Ok(pattern) = regex::Regex::new(&full_match_pattern) {
928 let mut rng = rand::rng();
929 let qualified_keys = match ns {
930 Some(ns) => vec![AnnoKey {
931 name: name.into(),
932 ns: ns.into(),
933 }],
934 None => self.get_qnames(name)?,
935 };
936 for anno_key in qualified_keys {
937 let anno_size = self
938 .anno_key_sizes
939 .get(&anno_key)
940 .copied()
941 .unwrap_or_default();
942 if let Some(histo) = self.histogram_bounds.get(&anno_key)
943 && !histo.is_empty()
944 {
945 let sampled_values = histo.iter().choose_multiple(&mut rng, 20);
946
947 let matches = sampled_values
948 .iter()
949 .filter(|v| pattern.is_match(v))
950 .count();
951 if sampled_values.len() == matches {
952 guessed_count += anno_size;
954 } else if matches == 0 {
955 guessed_count +=
957 (anno_size as f64 / sampled_values.len() as f64) as usize;
958 } else {
959 let match_ratio = (matches as f64) / (sampled_values.len() as f64);
961 guessed_count += ((anno_size as f64) * match_ratio) as usize;
962 }
963 }
964 }
965 }
966 }
967
968 Ok(guessed_count.min(total))
969 } else {
970 Ok(0)
971 }
972 }
973
974 fn guess_most_frequent_value(
975 &self,
976 ns: Option<&str>,
977 name: &str,
978 ) -> Result<Option<Cow<'_, str>>> {
979 let qualified_keys = match ns {
981 Some(ns) => vec![AnnoKey {
982 name: name.into(),
983 ns: ns.into(),
984 }],
985 None => self.get_qnames(name)?,
986 };
987
988 let mut sampled_values: HashMap<&str, usize> = HashMap::default();
989
990 for anno_key in qualified_keys {
992 if let Some(histo) = self.histogram_bounds.get(&anno_key) {
993 for v in histo.iter() {
994 let count: &mut usize = sampled_values.entry(v).or_insert(0);
995 *count += 1;
996 }
997 }
998 }
999 if !sampled_values.is_empty() {
1001 let mut max_count = 0;
1002 let mut max_value = Cow::Borrowed("");
1003 for (v, count) in sampled_values.into_iter() {
1004 if count >= max_count {
1005 max_value = Cow::Borrowed(v);
1006 max_count = count;
1007 }
1008 }
1009 Ok(Some(max_value))
1010 } else {
1011 Ok(None)
1012 }
1013 }
1014
1015 fn get_all_values(
1016 &self,
1017 key: &AnnoKey,
1018 most_frequent_first: bool,
1019 ) -> Result<Vec<Cow<'_, str>>> {
1020 if most_frequent_first {
1021 let mut values_with_count: HashMap<String, usize> = HashMap::default();
1022 for item in self.get_by_anno_qname_range(key) {
1023 let (data, _) = item?;
1024 let (_, _, val) = self.parse_by_anno_qname_key(data)?;
1025
1026 let count = values_with_count.entry(val).or_insert(0);
1027 *count += 1;
1028 }
1029 let mut values_with_count: Vec<(usize, Cow<str>)> = values_with_count
1030 .into_iter()
1031 .map(|(val, count)| (count, Cow::Owned(val)))
1032 .collect();
1033 values_with_count.sort();
1034 let result = values_with_count
1035 .into_iter()
1036 .map(|(_count, val)| val)
1037 .collect();
1038 Ok(result)
1039 } else {
1040 let values_unique: Result<HashSet<Cow<str>>> = self
1041 .get_by_anno_qname_range(key)
1042 .map(|item| {
1043 let (data, _) = item?;
1044 let (_, _, val) = self.parse_by_anno_qname_key(data)?;
1045 Ok(Cow::Owned(val))
1046 })
1047 .collect();
1048 Ok(values_unique?.into_iter().collect())
1049 }
1050 }
1051
1052 fn annotation_keys(&self) -> Result<Vec<AnnoKey>> {
1053 Ok(self.anno_key_sizes.keys().cloned().collect())
1054 }
1055
1056 fn get_largest_item(&self) -> Result<Option<T>> {
1057 Ok(self.largest_item.clone())
1058 }
1059
1060 fn calculate_statistics(&mut self) -> Result<()> {
1061 let max_histogram_buckets = 250;
1062 let max_sampled_annotations = 2500;
1063
1064 self.histogram_bounds.clear();
1065
1066 for anno_key in self.anno_key_sizes.keys() {
1068 let mut rng = rand::rng();
1070
1071 let all_values_for_key = self.get_by_anno_qname_range(anno_key);
1072
1073 let sampled_anno_values: Result<Vec<String>> = all_values_for_key
1074 .choose_multiple(&mut rng, max_sampled_annotations)
1075 .into_iter()
1076 .map(|data| {
1077 let (data, _) = data?;
1078 let (_, _, val) = self.parse_by_anno_qname_key(data)?;
1079 Ok(val)
1080 })
1081 .collect();
1082 let mut sampled_anno_values = sampled_anno_values?;
1083
1084 sampled_anno_values.sort();
1086
1087 let num_hist_bounds = if sampled_anno_values.len() < (max_histogram_buckets + 1) {
1088 sampled_anno_values.len()
1089 } else {
1090 max_histogram_buckets + 1
1091 };
1092
1093 let hist = self.histogram_bounds.entry(anno_key.clone()).or_default();
1094
1095 if num_hist_bounds >= 2 {
1096 hist.resize(num_hist_bounds, String::from(""));
1097
1098 let delta: usize = (sampled_anno_values.len() - 1) / (num_hist_bounds - 1);
1099 let delta_fraction: usize = (sampled_anno_values.len() - 1) % (num_hist_bounds - 1);
1100
1101 let mut pos = 0;
1102 let mut pos_fraction = 0;
1103 for hist_item in hist.iter_mut() {
1104 hist_item.clone_from(&sampled_anno_values[pos]);
1105 pos += delta;
1106 pos_fraction += delta_fraction;
1107
1108 if pos_fraction >= (num_hist_bounds - 1) {
1109 pos += 1;
1110 pos_fraction -= num_hist_bounds - 1;
1111 }
1112 }
1113 }
1114 }
1115 Ok(())
1116 }
1117
1118 fn load_annotations_from(&mut self, location: &Path) -> Result<()> {
1119 let location = location.join(SUBFOLDER_NAME);
1120
1121 if !self.location.eq(&location) {
1122 self.by_container = DiskMap::new(
1123 Some(&location.join("by_container.bin")),
1124 EVICTION_STRATEGY,
1125 BLOCK_CACHE_CAPACITY,
1126 BtreeConfig::default().fixed_value_size(T::key_size() + 9),
1127 )?;
1128 self.by_anno_qname = DiskMap::new(
1129 Some(&location.join("by_anno_qname.bin")),
1130 EVICTION_STRATEGY,
1131 BLOCK_CACHE_CAPACITY,
1132 BtreeConfig::default(),
1133 )?;
1134 }
1135
1136 let f = std::fs::File::open(location.join("custom.bin"))?;
1138 let mut reader = std::io::BufReader::new(f);
1139 self.largest_item = bincode::deserialize_from(&mut reader)?;
1140 self.anno_key_sizes = bincode::deserialize_from(&mut reader)?;
1141 self.histogram_bounds = bincode::deserialize_from(&mut reader)?;
1142 self.anno_key_symbols = bincode::deserialize_from(&mut reader)?;
1143 self.anno_key_symbols.after_deserialization();
1144
1145 Ok(())
1146 }
1147
1148 fn save_annotations_to(&self, location: &Path) -> Result<()> {
1149 let location = location.join(SUBFOLDER_NAME);
1150
1151 self.by_container
1153 .write_to(&location.join("by_container.bin"))?;
1154 self.by_anno_qname
1155 .write_to(&location.join("by_anno_qname.bin"))?;
1156
1157 let f = std::fs::File::create(location.join("custom.bin"))?;
1159 let mut writer = std::io::BufWriter::new(f);
1160 bincode::serialize_into(&mut writer, &self.largest_item)?;
1161 bincode::serialize_into(&mut writer, &self.anno_key_sizes)?;
1162 bincode::serialize_into(&mut writer, &self.histogram_bounds)?;
1163 bincode::serialize_into(&mut writer, &self.anno_key_symbols)?;
1164
1165 Ok(())
1166 }
1167}
1168
1169impl NodeAnnotationStorage for AnnoStorageImpl<NodeID> {
1170 fn get_node_id_from_name(&self, node_name: &str) -> Result<Option<NodeID>> {
1171 if let Some(node_name_symbol) = self.anno_key_symbols.get_symbol(&NODE_NAME_KEY) {
1172 let lower_bound = create_by_anno_qname_key(NodeID::MIN, node_name_symbol, node_name);
1173 let upper_bound = create_by_anno_qname_key(NodeID::MAX, node_name_symbol, node_name);
1174
1175 let mut results = self.by_anno_qname.range(lower_bound..=upper_bound);
1176 if let Some(item) = results.next() {
1177 let (data, _) = item?;
1178 let item_id = NodeID::parse_key(&data[data.len() - NodeID::key_size()..])?;
1179 return Ok(Some(item_id));
1180 }
1181 }
1182 Ok(None)
1183 }
1184
1185 fn has_node_name(&self, node_name: &str) -> Result<bool> {
1186 if let Some(node_name_symbol) = self.anno_key_symbols.get_symbol(&NODE_NAME_KEY) {
1187 let lower_bound = create_by_anno_qname_key(NodeID::MIN, node_name_symbol, node_name);
1188 let upper_bound = create_by_anno_qname_key(NodeID::MAX, node_name_symbol, node_name);
1189
1190 let mut results = self.by_anno_qname.range(lower_bound..=upper_bound);
1191 return Ok(results.next().is_some());
1192 }
1193 Ok(false)
1194 }
1195}
1196
1197impl EdgeAnnotationStorage for AnnoStorageImpl<Edge> {}
1198
1199#[cfg(test)]
1200mod tests;