1pub mod serialization;
2pub mod storage;
3pub mod update;
4
5use crate::{
6 annostorage::{AnnotationStorage, NodeAnnotationStorage, ValueSearch},
7 errors::Result,
8 graph::storage::{GraphStorage, WriteableGraphStorage, registry},
9};
10use crate::{
11 errors::GraphAnnisCoreError,
12 types::{AnnoKey, Annotation, Component, ComponentType, Edge, NodeID},
13};
14use clru::CLruCache;
15use rayon::prelude::*;
16use std::ops::Bound::Included;
17use std::path::{Path, PathBuf};
18use std::string::ToString;
19use std::{
20 borrow::Cow,
21 sync::{Arc, Mutex},
22};
23use std::{collections::BTreeMap, num::NonZeroUsize};
24use std::{collections::BTreeSet, io::prelude::*};
25use update::{GraphUpdate, UpdateEvent};
26
27pub const ANNIS_NS: &str = "annis";
28pub const DEFAULT_NS: &str = "default_ns";
29pub const NODE_NAME: &str = "node_name";
30pub const NODE_TYPE: &str = "node_type";
31pub const DEFAULT_EMPTY_LAYER: &str = "default_layer";
32
33const GLOBAL_STATISTICS_FILE_NAME: &str = "global_statistics.toml";
34
35lazy_static! {
36 pub static ref DEFAULT_ANNO_KEY: Arc<AnnoKey> = Arc::from(AnnoKey::default());
37 pub static ref NODE_NAME_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
38 ns: ANNIS_NS.into(),
39 name: NODE_NAME.into(),
40 });
41 pub static ref NODE_TYPE_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
43 ns: ANNIS_NS.into(),
44 name: NODE_TYPE.into(),
45 });
46}
47
48pub struct Graph<CT: ComponentType> {
55 node_annos: Box<dyn NodeAnnotationStorage>,
56
57 location: Option<PathBuf>,
58
59 components: BTreeMap<Component<CT>, Option<Arc<dyn GraphStorage>>>,
60 current_change_id: u64,
61
62 background_persistance: Arc<Mutex<()>>,
63
64 pub global_statistics: Option<CT::GlobalStatistics>,
65
66 disk_based: bool,
67}
68
69fn load_component_from_disk(component_path: &Path) -> Result<Arc<dyn GraphStorage>> {
70 let impl_path = PathBuf::from(component_path).join("impl.cfg");
72 let mut f_impl = std::fs::File::open(impl_path)?;
73 let mut impl_name = String::new();
74 f_impl.read_to_string(&mut impl_name)?;
75
76 let gs = registry::deserialize(&impl_name, component_path)?;
77
78 Ok(gs)
79}
80
81fn component_to_relative_path<CT: ComponentType>(c: &Component<CT>) -> PathBuf {
82 let mut p = PathBuf::new();
83 p.push("gs");
84 p.push(c.get_type().to_string());
85 p.push(if c.layer.is_empty() {
86 DEFAULT_EMPTY_LAYER
87 } else {
88 &c.layer
89 });
90 p.push(c.name.as_str());
91 p
92}
93
94fn component_path<CT: ComponentType>(
95 location: &Option<PathBuf>,
96 c: &Component<CT>,
97) -> Option<PathBuf> {
98 match location {
99 Some(loc) => {
100 let mut p = PathBuf::from(loc);
101 let backup = loc.join("backup");
103 if backup.exists() {
104 p.push("backup");
105 } else {
106 p.push("current");
107 }
108 p.push(component_to_relative_path(c));
109 Some(p)
110 }
111 None => None,
112 }
113}
114
115pub fn find_components_from_disk<CT: ComponentType, P: AsRef<Path>>(
117 location: P,
118) -> Result<BTreeSet<Component<CT>>> {
119 let mut result = BTreeSet::new();
120 for c in CT::all_component_types().into_iter() {
122 let cpath = PathBuf::from(location.as_ref())
123 .join("gs")
124 .join(c.to_string());
125
126 if cpath.is_dir() {
127 for layer in cpath.read_dir()? {
129 let layer = layer?;
130 if layer.path().is_dir() {
131 let layer_file_name = layer.file_name();
133 let layer_name_from_file = layer_file_name.to_string_lossy();
134 let layer_name = if layer_name_from_file == DEFAULT_EMPTY_LAYER {
135 String::default()
136 } else {
137 layer_name_from_file.into()
138 };
139 let empty_name_component =
140 Component::new(c.clone(), layer_name.clone(), String::default());
141 {
142 let cfg_file = PathBuf::from(location.as_ref())
143 .join(component_to_relative_path(&empty_name_component))
144 .join("impl.cfg");
145
146 if cfg_file.is_file() {
147 result.insert(empty_name_component.clone());
148 debug!("Registered component {}", empty_name_component);
149 }
150 }
151 for name in layer.path().read_dir()? {
153 let name = name?;
154 let named_component = Component::new(
155 c.clone(),
156 layer_name.clone(),
157 name.file_name().to_string_lossy().into(),
158 );
159 let cfg_file = PathBuf::from(location.as_ref())
160 .join(component_to_relative_path(&named_component))
161 .join("impl.cfg");
162
163 if cfg_file.is_file() {
164 result.insert(named_component.clone());
165 debug!("Registered component {}", named_component);
166 }
167 }
168 }
169 }
170 }
171 } Ok(result)
173}
174
175impl<CT: ComponentType> Graph<CT> {
176 pub fn new(disk_based: bool) -> Result<Self> {
178 let node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
179 Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
180 } else {
181 Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
182 };
183
184 Ok(Graph {
185 node_annos,
186 components: BTreeMap::new(),
187 global_statistics: None,
188 location: None,
189
190 current_change_id: 0,
191
192 background_persistance: Arc::new(Mutex::new(())),
193
194 disk_based,
195 })
196 }
197
198 pub fn with_default_graphstorages(disk_based: bool) -> Result<Self> {
200 let mut db = Graph::new(disk_based)?;
201 for c in CT::default_components() {
202 db.get_or_create_writable(&c)?;
203 }
204 Ok(db)
205 }
206
207 pub fn open(&mut self, location: &Path) -> Result<()> {
212 debug!("Opening corpus from {}", location.to_string_lossy());
213 self.clear()?;
214 self.location = Some(location.to_path_buf());
215 self.internal_open(location)?;
216
217 Ok(())
218 }
219
220 pub fn import(&mut self, location: &Path) -> Result<()> {
226 debug!("Importing corpus from {}", location.to_string_lossy());
227 self.clear()?;
228 self.location = Some(location.to_path_buf());
230 self.internal_open(location)?;
231 self.ensure_loaded_all()?;
232 self.location = None;
234 Ok(())
235 }
236
237 #[deprecated(note = "Please use `open` instead")]
243 pub fn load_from(&mut self, location: &Path, preload: bool) -> Result<()> {
244 self.open(location)?;
245 if preload {
246 self.ensure_loaded_all()?;
247 }
248 Ok(())
249 }
250
251 fn internal_open(&mut self, location: &Path) -> Result<()> {
257 let backup = location.join("backup");
258
259 let mut load_from_backup = false;
260 let dir2load = if backup.exists() && backup.is_dir() {
261 load_from_backup = true;
262 backup.clone()
263 } else {
264 location.join("current")
265 };
266
267 self.global_statistics = None;
269 let global_statistics_file = dir2load.join(GLOBAL_STATISTICS_FILE_NAME);
270 if global_statistics_file.exists() && global_statistics_file.is_file() {
271 let file_content = std::fs::read_to_string(global_statistics_file)?;
272 self.global_statistics = Some(toml::from_str(&file_content)?);
273 }
274
275 let ondisk_subdirectory = dir2load.join(crate::annostorage::ondisk::SUBFOLDER_NAME);
277 if ondisk_subdirectory.exists() && ondisk_subdirectory.is_dir() {
278 self.disk_based = true;
279 let node_annos_tmp =
281 crate::annostorage::ondisk::AnnoStorageImpl::new(Some(ondisk_subdirectory))?;
282 self.node_annos = Box::new(node_annos_tmp);
283 } else {
284 self.disk_based = false;
286 let mut node_annos_tmp = crate::annostorage::inmemory::AnnoStorageImpl::new();
287 node_annos_tmp.load_annotations_from(&dir2load)?;
288 self.node_annos = Box::new(node_annos_tmp);
289 }
290
291 let log_path = dir2load.join("update_log.bin");
292
293 let logfile_exists = log_path.exists() && log_path.is_file();
294
295 self.components = find_components_from_disk(&dir2load)?
296 .into_iter()
297 .map(|c| (c, None))
298 .collect();
299
300 if logfile_exists | load_from_backup {
302 self.ensure_loaded_all()?;
303 }
304
305 if logfile_exists {
306 let log_reader = std::fs::File::open(&log_path)?;
308 let mut update = bincode::deserialize_from(log_reader)?;
309 self.apply_update_in_memory(&mut update, true, |_| {})?;
310 } else {
311 self.current_change_id = 0;
312 }
313
314 if load_from_backup {
315 self.save_to(&location.join("current"))?;
317 let tmp_dir = tempfile::Builder::new()
319 .prefix("temporary-graphannis-backup")
320 .tempdir_in(location)?;
321 std::fs::remove_dir(tmp_dir.path())?;
323 std::fs::rename(&backup, tmp_dir.path())?;
324 tmp_dir.close()?;
326 }
327
328 Ok(())
329 }
330
331 pub fn save_to(&mut self, location: &Path) -> Result<()> {
333 self.ensure_loaded_all()?;
335 self.internal_save(&location.join("current"))
336 }
337
338 pub fn persist_to(&mut self, location: &Path) -> Result<()> {
340 self.location = Some(location.to_path_buf());
341 self.internal_save(&location.join("current"))
342 }
343
344 fn clear(&mut self) -> Result<()> {
347 self.node_annos = Box::new(crate::annostorage::inmemory::AnnoStorageImpl::new());
348 self.components.clear();
349 Ok(())
350 }
351
352 fn internal_save(&self, location: &Path) -> Result<()> {
353 let location = PathBuf::from(location);
354
355 std::fs::create_dir_all(&location)?;
356
357 self.node_annos.save_annotations_to(&location)?;
358
359 for (c, e) in &self.components {
360 if let Some(ref data) = *e {
361 let dir = PathBuf::from(&location).join(component_to_relative_path(c));
362 std::fs::create_dir_all(&dir)?;
363
364 let impl_name = data.serialization_id();
365 data.save_to(&dir)?;
366
367 let cfg_path = PathBuf::from(&dir).join("impl.cfg");
368 let mut f_cfg = std::fs::File::create(cfg_path)?;
369 f_cfg.write_all(impl_name.as_bytes())?;
370 }
371 }
372
373 if let Some(s) = &self.global_statistics {
375 let file_content = toml::to_string(s)?;
376 std::fs::write(location.join(GLOBAL_STATISTICS_FILE_NAME), file_content)?;
377 }
378
379 Ok(())
380 }
381
382 fn get_cached_node_id_from_name(
383 &self,
384 node_name: Cow<str>,
385 cache: &mut CLruCache<String, Option<NodeID>>,
386 ) -> Result<Option<NodeID>> {
387 if let Some(id) = cache.get(node_name.as_ref()) {
388 Ok(*id)
389 } else {
390 let id = self.node_annos.get_node_id_from_name(&node_name)?;
391 cache.put(node_name.to_string(), id);
392 Ok(id)
393 }
394 }
395
396 #[allow(clippy::cognitive_complexity)]
397 fn apply_update_in_memory<F>(
398 &mut self,
399 u: &mut GraphUpdate,
400 update_statistics: bool,
401 progress_callback: F,
402 ) -> Result<()>
403 where
404 F: Fn(&str),
405 {
406 let all_components = self.get_all_components(None, None);
407
408 let mut update_graph_index = ComponentType::init_update_graph_index(self)?;
409 let cache_size = NonZeroUsize::new(1_000).ok_or(GraphAnnisCoreError::ZeroCacheSize)?;
411 let mut node_id_cache = CLruCache::new(cache_size);
412 let total_nr_updates = u.len()?;
414 progress_callback(&format!("applying {} atomic updates", total_nr_updates));
415 for (nr_updates, update_event) in u.iter()?.enumerate() {
416 let (id, change) = update_event?;
417 trace!("applying event {:?}", &change);
418 ComponentType::before_update_event(&change, self, &mut update_graph_index)?;
419 match &change {
420 UpdateEvent::AddNode {
421 node_name,
422 node_type,
423 } => {
424 if !self.node_annos.has_node_name(node_name)? {
426 let new_node_id: NodeID =
427 if let Some(id) = self.node_annos.get_largest_item()? {
428 id + 1
429 } else {
430 0
431 };
432
433 let new_anno_name = Annotation {
434 key: NODE_NAME_KEY.as_ref().clone(),
435 val: node_name.into(),
436 };
437 let new_anno_type = Annotation {
438 key: NODE_TYPE_KEY.as_ref().clone(),
439 val: node_type.into(),
440 };
441
442 self.node_annos.insert(new_node_id, new_anno_name)?;
444 self.node_annos.insert(new_node_id, new_anno_type)?;
445
446 node_id_cache.put(node_name.clone(), Some(new_node_id));
448 }
449 }
450 UpdateEvent::DeleteNode { node_name } => {
451 if let Some(existing_node_id) = self.get_cached_node_id_from_name(
452 Cow::Borrowed(node_name),
453 &mut node_id_cache,
454 )? {
455 self.node_annos.remove_item(&existing_node_id)?;
457
458 for c in all_components.iter() {
460 if let Ok(gs) = self.get_or_create_writable(c) {
461 gs.delete_node(existing_node_id)?;
462 }
463 }
464
465 node_id_cache.put(node_name.clone(), None);
467 }
468 }
469 UpdateEvent::AddNodeLabel {
470 node_name,
471 anno_ns,
472 anno_name,
473 anno_value,
474 } => {
475 if let Some(existing_node_id) = self.get_cached_node_id_from_name(
476 Cow::Borrowed(node_name),
477 &mut node_id_cache,
478 )? {
479 let anno = Annotation {
480 key: AnnoKey {
481 ns: anno_ns.into(),
482 name: anno_name.into(),
483 },
484 val: anno_value.into(),
485 };
486 self.node_annos.insert(existing_node_id, anno)?;
487 }
488 }
489 UpdateEvent::DeleteNodeLabel {
490 node_name,
491 anno_ns,
492 anno_name,
493 } => {
494 if let Some(existing_node_id) = self.get_cached_node_id_from_name(
495 Cow::Borrowed(node_name),
496 &mut node_id_cache,
497 )? {
498 let key = AnnoKey {
499 ns: anno_ns.into(),
500 name: anno_name.into(),
501 };
502 self.node_annos
503 .remove_annotation_for_item(&existing_node_id, &key)?;
504 }
505 }
506 UpdateEvent::AddEdge {
507 source_node,
508 target_node,
509 layer,
510 component_type,
511 component_name,
512 } => {
513 let source = self.get_cached_node_id_from_name(
514 Cow::Borrowed(source_node),
515 &mut node_id_cache,
516 )?;
517 let target = self.get_cached_node_id_from_name(
518 Cow::Borrowed(target_node),
519 &mut node_id_cache,
520 )?;
521 if let (Some(source), Some(target)) = (source, target)
523 && let Ok(ctype) = CT::from_str(component_type)
524 {
525 let c = Component::new(ctype, layer.into(), component_name.into());
526 let gs = self.get_or_create_writable(&c)?;
527 gs.add_edge(Edge { source, target })?;
528 }
529 }
530 UpdateEvent::DeleteEdge {
531 source_node,
532 target_node,
533 layer,
534 component_type,
535 component_name,
536 } => {
537 let source = self.get_cached_node_id_from_name(
538 Cow::Borrowed(source_node),
539 &mut node_id_cache,
540 )?;
541 let target = self.get_cached_node_id_from_name(
542 Cow::Borrowed(target_node),
543 &mut node_id_cache,
544 )?;
545 if let (Some(source), Some(target)) = (source, target)
546 && let Ok(ctype) = CT::from_str(component_type)
547 {
548 let c = Component::new(ctype, layer.into(), component_name.into());
549
550 let gs = self.get_or_create_writable(&c)?;
551 gs.delete_edge(&Edge { source, target })?;
552 }
553 }
554 UpdateEvent::AddEdgeLabel {
555 source_node,
556 target_node,
557 layer,
558 component_type,
559 component_name,
560 anno_ns,
561 anno_name,
562 anno_value,
563 } => {
564 let source = self.get_cached_node_id_from_name(
565 Cow::Borrowed(source_node),
566 &mut node_id_cache,
567 )?;
568 let target = self.get_cached_node_id_from_name(
569 Cow::Borrowed(target_node),
570 &mut node_id_cache,
571 )?;
572 if let (Some(source), Some(target)) = (source, target)
573 && let Ok(ctype) = CT::from_str(component_type)
574 {
575 let c = Component::new(ctype, layer.into(), component_name.into());
576 let gs = self.get_or_create_writable(&c)?;
577 let e = Edge { source, target };
579 if gs.is_connected(source, target, 1, Included(1))? {
580 let anno = Annotation {
581 key: AnnoKey {
582 ns: anno_ns.into(),
583 name: anno_name.into(),
584 },
585 val: anno_value.into(),
586 };
587 gs.add_edge_annotation(e, anno)?;
588 }
589 }
590 }
591 UpdateEvent::DeleteEdgeLabel {
592 source_node,
593 target_node,
594 layer,
595 component_type,
596 component_name,
597 anno_ns,
598 anno_name,
599 } => {
600 let source = self.get_cached_node_id_from_name(
601 Cow::Borrowed(source_node),
602 &mut node_id_cache,
603 )?;
604 let target = self.get_cached_node_id_from_name(
605 Cow::Borrowed(target_node),
606 &mut node_id_cache,
607 )?;
608 if let (Some(source), Some(target)) = (source, target)
609 && let Ok(ctype) = CT::from_str(component_type)
610 {
611 let c = Component::new(ctype, layer.into(), component_name.into());
612 let gs = self.get_or_create_writable(&c)?;
613 let e = Edge { source, target };
615 if gs.is_connected(source, target, 1, Included(1))? {
616 let key = AnnoKey {
617 ns: anno_ns.into(),
618 name: anno_name.into(),
619 };
620 gs.delete_edge_annotation(&e, &key)?;
621 }
622 }
623 }
624 } ComponentType::after_update_event(change, self, &mut update_graph_index)?;
626 self.current_change_id = id;
627
628 if nr_updates > 0 && nr_updates % 100_000 == 0 {
629 let progress = ((nr_updates as f64) / (total_nr_updates as f64)) * 100.0;
631 progress_callback(&format!(
632 "applied {:.2}% of the atomic updates ({}/{})",
633 progress, nr_updates, total_nr_updates,
634 ));
635 }
636 } if update_statistics {
639 progress_callback("calculating all statistics");
640 self.calculate_all_statistics()?;
641 }
642
643 progress_callback("extending graph with model-specific index");
644 ComponentType::apply_update_graph_index(update_graph_index, self)?;
645
646 Ok(())
647 }
648
649 pub fn apply_update<F>(&mut self, u: &mut GraphUpdate, progress_callback: F) -> Result<()>
652 where
653 F: Fn(&str),
654 {
655 progress_callback("applying list of atomic updates");
656
657 self.ensure_loaded_all()?;
659
660 let result = self.apply_update_in_memory(u, true, &progress_callback);
661 progress_callback("memory updates completed, persisting updates to disk");
662 self.persist_updates(u, result, progress_callback)?;
663 Ok(())
664 }
665
666 pub fn apply_update_keep_statistics<F>(
669 &mut self,
670 u: &mut GraphUpdate,
671 progress_callback: F,
672 ) -> Result<()>
673 where
674 F: Fn(&str),
675 {
676 progress_callback("applying list of atomic updates");
677
678 self.ensure_loaded_all()?;
680
681 let result = self.apply_update_in_memory(u, false, &progress_callback);
682 progress_callback("memory updates completed, persisting updates to disk");
683 self.persist_updates(u, result, progress_callback)?;
684 Ok(())
685 }
686
687 fn persist_updates<F>(
688 &mut self,
689 u: &mut GraphUpdate,
690 apply_update_result: Result<()>,
691 progress_callback: F,
692 ) -> Result<()>
693 where
694 F: Fn(&str),
695 {
696 if let Some(location) = self.location.clone() {
697 trace!("output location for persisting updates is {:?}", location);
698 if apply_update_result.is_ok() {
699 let current_path = location.join("current");
700 std::fs::create_dir_all(¤t_path)?;
702
703 let log_path = current_path.join("update_log.bin");
705
706 let temporary_dir = tempfile::tempdir_in(¤t_path)?;
708 let mut temporary_disk_file = tempfile::NamedTempFile::new_in(&temporary_dir)?;
709
710 debug!("writing WAL update log to {:?}", temporary_disk_file.path());
711 bincode::serialize_into(temporary_disk_file.as_file(), &u)?;
712 temporary_disk_file.flush()?;
713 debug!("moving finished WAL update log to {:?}", &log_path);
714 temporary_disk_file.persist(&log_path)?;
716
717 progress_callback("finished writing WAL update log");
718 } else {
719 trace!(
720 "error occured while applying updates: {:?}",
721 &apply_update_result
722 );
723 self.open(&location)?;
725 self.ensure_loaded_all()?;
726 }
727 }
728
729 apply_update_result
730 }
731
732 pub fn background_sync_wal_updates(&self) -> Result<()> {
734 if let Some(ref location) = self.location {
737 let _lock = self.background_persistance.lock()?;
739
740 self.internal_save_with_backup(location)?;
741 }
742
743 Ok(())
744 }
745
746 fn internal_save_with_backup(&self, location: &Path) -> Result<()> {
750 let backup_location = location.join("backup");
757 let current_location = location.join("current");
758 if !backup_location.exists() {
759 std::fs::rename(¤t_location, &backup_location)?;
760 }
761
762 self.internal_save(¤t_location)?;
764
765 let tmp_dir = tempfile::Builder::new()
767 .prefix("temporary-graphannis-backup")
768 .tempdir_in(location)?;
769 std::fs::remove_dir(tmp_dir.path())?;
771 std::fs::rename(&backup_location, tmp_dir.path())?;
772 tmp_dir.close()?;
774 Ok(())
775 }
776
777 fn ensure_writeable(&mut self, c: &Component<CT>) -> Result<()> {
778 self.ensure_loaded(c)?;
779 if let Some(gs_opt) = self.components.get_mut(c) {
781 let gs = gs_opt
783 .as_mut()
784 .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
785 let is_writable = {
787 Arc::get_mut(gs)
788 .ok_or_else(|| {
789 GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string())
790 })?
791 .as_writeable()
792 .is_some()
793 };
794 if is_writable {
795 return Ok(());
796 }
797 } else {
798 return Ok(());
800 }
801
802 let readonly_gs = self
804 .components
805 .get(c)
806 .cloned()
807 .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
808 .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
809 let writable_gs = registry::create_writeable(self, Some(readonly_gs.as_ref()))?;
810 self.components.insert(c.to_owned(), Some(writable_gs));
811
812 Ok(())
813 }
814
815 pub fn calculate_all_statistics(&mut self) -> Result<()> {
817 self.ensure_loaded_all()?;
818
819 debug!("Calculating node statistics");
820 self.node_annos.calculate_statistics()?;
821 for c in self.get_all_components(None, None) {
822 debug!("Calculating statistics for component {}", &c);
823 self.calculate_component_statistics(&c)?;
824 }
825
826 debug!("Calculating global graph statistics");
827 CT::calculate_global_statistics(self)?;
828
829 Ok(())
830 }
831
832 pub fn calculate_component_statistics(&mut self, c: &Component<CT>) -> Result<()> {
834 let mut result: Result<()> = Ok(());
835 let mut entry = self
836 .components
837 .remove(c)
838 .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?;
839 if let Some(ref mut gs) = entry {
840 if let Some(gs_mut) = Arc::get_mut(gs) {
841 if let Some(writeable_gs) = gs_mut.as_writeable() {
843 writeable_gs.calculate_statistics()?;
844 }
845 } else {
846 result = Err(GraphAnnisCoreError::NonExclusiveComponentReference(
847 c.to_string(),
848 ));
849 }
850 }
851 self.components.insert(c.clone(), entry);
853 result
854 }
855
856 pub fn get_or_create_writable(
860 &mut self,
861 c: &Component<CT>,
862 ) -> Result<&mut dyn WriteableGraphStorage> {
863 if self.components.contains_key(c) {
864 self.ensure_writeable(c)?;
866 } else {
867 let w = registry::create_writeable(self, None)?;
868
869 self.components.insert(c.clone(), Some(w));
870 }
871
872 let entry: &mut Arc<dyn GraphStorage> = self
874 .components
875 .get_mut(c)
876 .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
877 .as_mut()
878 .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
879
880 let gs_mut_ref: &mut dyn GraphStorage = Arc::get_mut(entry)
881 .ok_or_else(|| GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string()))?;
882 let result = gs_mut_ref
883 .as_writeable()
884 .ok_or_else(|| GraphAnnisCoreError::ReadOnlyComponent(c.to_string()))?;
885 Ok(result)
886 }
887
888 pub fn is_loaded(&self, c: &Component<CT>) -> bool {
890 let entry: Option<&Option<Arc<dyn GraphStorage>>> = self.components.get(c);
891 if let Some(gs_opt) = entry
892 && gs_opt.is_some()
893 {
894 return true;
895 }
896 false
897 }
898
899 pub fn ensure_loaded_all(&mut self) -> Result<()> {
901 let mut components_to_load: Vec<_> = Vec::with_capacity(self.components.len());
902
903 for (c, gs) in &self.components {
905 if gs.is_none() {
906 components_to_load.push(c.clone());
907 }
908 }
909
910 self.ensure_loaded_parallel(&components_to_load)?;
911 Ok(())
912 }
913
914 pub fn ensure_loaded(&mut self, c: &Component<CT>) -> Result<()> {
916 if let Some(gs_opt) = self.components.get_mut(c) {
918 if gs_opt.is_none() {
920 let component_path = component_path(&self.location, c)
921 .ok_or(GraphAnnisCoreError::EmptyComponentPath)?;
922 debug!(
923 "loading component {} from {}",
924 c,
925 &component_path.to_string_lossy()
926 );
927 let component = load_component_from_disk(&component_path)?;
928 gs_opt.get_or_insert_with(|| component);
929 }
930 }
931 Ok(())
932 }
933
934 pub fn ensure_loaded_parallel(
939 &mut self,
940 components_to_load: &[Component<CT>],
941 ) -> Result<Vec<Component<CT>>> {
942 let components_to_load: Vec<_> = components_to_load
945 .iter()
946 .filter(|c| {
947 if let Some(e) = self.components.get(c) {
948 e.is_none()
949 } else {
950 false
951 }
952 })
953 .collect();
954
955 let loaded_components: Vec<(_, Result<Arc<dyn GraphStorage>>)> = components_to_load
957 .into_par_iter()
958 .map(|c| match component_path(&self.location, c) {
959 Some(cpath) => {
960 debug!(
961 "loading component in parallel {} from {}",
962 c,
963 &cpath.to_string_lossy()
964 );
965 (c, load_component_from_disk(&cpath))
966 }
967 None => (c, Err(GraphAnnisCoreError::EmptyComponentPath)),
968 })
969 .collect();
970
971 let mut result = Vec::with_capacity(loaded_components.len());
973 for (c, gs) in loaded_components {
974 let gs = gs?;
975 self.components.insert(c.clone(), Some(gs));
976 result.push(c.clone());
977 }
978 Ok(result)
979 }
980
981 pub fn optimize_impl(&mut self, disk_based: bool) -> Result<()> {
982 self.ensure_loaded_all()?;
983
984 if self.disk_based != disk_based {
985 self.disk_based = disk_based;
986
987 let mut new_node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
989 Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
990 } else {
991 Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
992 };
993
994 info!("copying node annotations");
996 for m in self
997 .node_annos
998 .exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Any)
999 {
1000 let m = m?;
1001 for anno in self.node_annos.get_annotations_for_item(&m.node)? {
1002 new_node_annos.insert(m.node, anno)?;
1003 }
1004 }
1005 self.node_annos = new_node_annos;
1006 }
1007
1008 info!("re-calculating all statistics");
1009 self.calculate_all_statistics()?;
1010
1011 for c in self.get_all_components(None, None) {
1012 info!("optimizing implementation for component {}", &c);
1014 self.optimize_gs_impl(&c)?;
1015 }
1016 if let Some(location) = &self.location {
1017 info!("saving corpus to disk");
1018 self.internal_save_with_backup(location)?;
1019 }
1020 Ok(())
1021 }
1022
1023 pub fn optimize_gs_impl(&mut self, c: &Component<CT>) -> Result<()> {
1024 if let Some(gs) = self.get_graphstorage(c)
1025 && let Some(stats) = gs.get_statistics()
1026 {
1027 let opt_info = registry::get_optimal_impl_heuristic(self, stats);
1028
1029 if opt_info.id != gs.serialization_id() {
1031 let mut new_gs = registry::create_from_info(&opt_info)?;
1032 let converted = if let Some(new_gs_mut) = Arc::get_mut(&mut new_gs) {
1033 info!(
1034 "converting component {} to implementation {}",
1035 c, opt_info.id,
1036 );
1037 new_gs_mut.copy(self.get_node_annos(), gs.as_ref())?;
1038 true
1039 } else {
1040 false
1041 };
1042 if converted {
1043 info!(
1045 "finished conversion of component {} to implementation {}",
1046 c, opt_info.id,
1047 );
1048 self.components.insert(c.clone(), Some(new_gs.clone()));
1049 }
1050 }
1051 }
1052
1053 Ok(())
1054 }
1055
1056 pub fn get_graphstorage(&self, c: &Component<CT>) -> Option<Arc<dyn GraphStorage>> {
1058 let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1060 Some(entry.clone())
1061 }
1062
1063 pub fn get_graphstorage_as_ref<'a>(
1065 &'a self,
1066 c: &Component<CT>,
1067 ) -> Option<&'a dyn GraphStorage> {
1068 let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1070 Some(entry.as_ref())
1071 }
1072
1073 pub fn get_node_annos(&self) -> &dyn NodeAnnotationStorage {
1075 self.node_annos.as_ref()
1076 }
1077
1078 pub fn get_node_annos_mut(&mut self) -> &mut dyn NodeAnnotationStorage {
1080 self.node_annos.as_mut()
1081 }
1082
1083 pub fn get_all_components(&self, ctype: Option<CT>, name: Option<&str>) -> Vec<Component<CT>> {
1087 if let (Some(ctype), Some(name)) = (&ctype, name) {
1088 let mut result: Vec<_> = Vec::new();
1090 let ckey = Component::new(ctype.clone(), String::default(), name.into());
1091
1092 for (c, _) in self.components.range(ckey..) {
1093 if c.name != name || &c.get_type() != ctype {
1094 break;
1095 }
1096 result.push(c.clone());
1097 }
1098 result
1099 } else if let Some(ctype) = &ctype {
1100 let mut result: Vec<_> = Vec::new();
1102 let ckey = Component::new(ctype.clone(), String::default(), String::default());
1103
1104 for (c, _) in self.components.range(ckey..) {
1105 if &c.get_type() != ctype {
1106 break;
1107 }
1108 result.push(c.clone());
1109 }
1110 result
1111 } else {
1112 let filtered_components = self
1114 .components
1115 .keys()
1116 .filter(move |c| {
1117 if let Some(ctype) = ctype.clone()
1118 && ctype != c.get_type()
1119 {
1120 return false;
1121 }
1122 if let Some(name) = name
1123 && name != c.name
1124 {
1125 return false;
1126 }
1127 true
1128 })
1129 .cloned();
1130 filtered_components.collect()
1131 }
1132 }
1133}
1134
1135#[cfg(test)]
1136mod tests;