1pub mod serialization;
2pub mod storage;
3pub mod update;
4
5use crate::{
6 annostorage::{AnnotationStorage, NodeAnnotationStorage, ValueSearch},
7 errors::Result,
8 graph::storage::{registry, GraphStorage, WriteableGraphStorage},
9};
10use crate::{
11 errors::GraphAnnisCoreError,
12 types::{AnnoKey, Annotation, Component, ComponentType, Edge, NodeID},
13};
14use clru::CLruCache;
15use rayon::prelude::*;
16use smartstring::alias::String as SmartString;
17use std::io::prelude::*;
18use std::ops::Bound::Included;
19use std::path::{Path, PathBuf};
20use std::string::ToString;
21use std::{
22 borrow::Cow,
23 sync::{Arc, Mutex},
24};
25use std::{collections::BTreeMap, num::NonZeroUsize};
26use update::{GraphUpdate, UpdateEvent};
27
28pub const ANNIS_NS: &str = "annis";
29pub const DEFAULT_NS: &str = "default_ns";
30pub const NODE_NAME: &str = "node_name";
31pub const NODE_TYPE: &str = "node_type";
32pub const DEFAULT_EMPTY_LAYER: &str = "default_layer";
33
34const GLOBAL_STATISTICS_FILE_NAME: &str = "global_statistics.toml";
35
36lazy_static! {
37 pub static ref DEFAULT_ANNO_KEY: Arc<AnnoKey> = Arc::from(AnnoKey::default());
38 pub static ref NODE_NAME_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
39 ns: ANNIS_NS.into(),
40 name: NODE_NAME.into(),
41 });
42 pub static ref NODE_TYPE_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
44 ns: ANNIS_NS.into(),
45 name: NODE_TYPE.into(),
46 });
47}
48
49pub struct Graph<CT: ComponentType> {
56 node_annos: Box<dyn NodeAnnotationStorage>,
57
58 location: Option<PathBuf>,
59
60 components: BTreeMap<Component<CT>, Option<Arc<dyn GraphStorage>>>,
61 current_change_id: u64,
62
63 background_persistance: Arc<Mutex<()>>,
64
65 pub global_statistics: Option<CT::GlobalStatistics>,
66
67 disk_based: bool,
68}
69
70fn load_component_from_disk(component_path: &Path) -> Result<Arc<dyn GraphStorage>> {
71 let impl_path = PathBuf::from(component_path).join("impl.cfg");
73 let mut f_impl = std::fs::File::open(impl_path)?;
74 let mut impl_name = String::new();
75 f_impl.read_to_string(&mut impl_name)?;
76
77 let gs = registry::deserialize(&impl_name, component_path)?;
78
79 Ok(gs)
80}
81
82fn component_to_relative_path<CT: ComponentType>(c: &Component<CT>) -> PathBuf {
83 let mut p = PathBuf::new();
84 p.push("gs");
85 p.push(c.get_type().to_string());
86 p.push(if c.layer.is_empty() {
87 DEFAULT_EMPTY_LAYER
88 } else {
89 &c.layer
90 });
91 p.push(c.name.as_str());
92 p
93}
94
95fn component_path<CT: ComponentType>(
96 location: &Option<PathBuf>,
97 c: &Component<CT>,
98) -> Option<PathBuf> {
99 match location {
100 Some(ref loc) => {
101 let mut p = PathBuf::from(loc);
102 let backup = loc.join("backup");
104 if backup.exists() {
105 p.push("backup");
106 } else {
107 p.push("current");
108 }
109 p.push(component_to_relative_path(c));
110 Some(p)
111 }
112 None => None,
113 }
114}
115
116impl<CT: ComponentType> Graph<CT> {
117 pub fn new(disk_based: bool) -> Result<Self> {
119 let node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
120 Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
121 } else {
122 Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
123 };
124
125 Ok(Graph {
126 node_annos,
127 components: BTreeMap::new(),
128 global_statistics: None,
129 location: None,
130
131 current_change_id: 0,
132
133 background_persistance: Arc::new(Mutex::new(())),
134
135 disk_based,
136 })
137 }
138
139 pub fn with_default_graphstorages(disk_based: bool) -> Result<Self> {
141 let mut db = Graph::new(disk_based)?;
142 for c in CT::default_components() {
143 db.get_or_create_writable(&c)?;
144 }
145 Ok(db)
146 }
147
148 pub fn open(&mut self, location: &Path) -> Result<()> {
153 debug!("Opening corpus from {}", location.to_string_lossy());
154 self.clear()?;
155 self.location = Some(location.to_path_buf());
156 self.internal_open(location)?;
157
158 Ok(())
159 }
160
161 pub fn import(&mut self, location: &Path) -> Result<()> {
167 debug!("Importing corpus from {}", location.to_string_lossy());
168 self.clear()?;
169 self.location = Some(location.to_path_buf());
171 self.internal_open(location)?;
172 self.ensure_loaded_all()?;
173 self.location = None;
175 Ok(())
176 }
177
178 #[deprecated(note = "Please use `open` instead")]
184 pub fn load_from(&mut self, location: &Path, preload: bool) -> Result<()> {
185 self.open(location)?;
186 if preload {
187 self.ensure_loaded_all()?;
188 }
189 Ok(())
190 }
191
192 fn internal_open(&mut self, location: &Path) -> Result<()> {
198 let backup = location.join("backup");
199
200 let mut load_from_backup = false;
201 let dir2load = if backup.exists() && backup.is_dir() {
202 load_from_backup = true;
203 backup.clone()
204 } else {
205 location.join("current")
206 };
207
208 self.global_statistics = None;
210 let global_statistics_file = dir2load.join(GLOBAL_STATISTICS_FILE_NAME);
211 if global_statistics_file.exists() && global_statistics_file.is_file() {
212 let file_content = std::fs::read_to_string(global_statistics_file)?;
213 self.global_statistics = Some(toml::from_str(&file_content)?);
214 }
215
216 let ondisk_subdirectory = dir2load.join(crate::annostorage::ondisk::SUBFOLDER_NAME);
218 if ondisk_subdirectory.exists() && ondisk_subdirectory.is_dir() {
219 self.disk_based = true;
220 let node_annos_tmp =
222 crate::annostorage::ondisk::AnnoStorageImpl::new(Some(ondisk_subdirectory))?;
223 self.node_annos = Box::new(node_annos_tmp);
224 } else {
225 self.disk_based = false;
227 let mut node_annos_tmp = crate::annostorage::inmemory::AnnoStorageImpl::new();
228 node_annos_tmp.load_annotations_from(&dir2load)?;
229 self.node_annos = Box::new(node_annos_tmp);
230 }
231
232 let log_path = dir2load.join("update_log.bin");
233
234 let logfile_exists = log_path.exists() && log_path.is_file();
235
236 self.find_components_from_disk(&dir2load)?;
237
238 if logfile_exists | load_from_backup {
240 self.ensure_loaded_all()?;
241 }
242
243 if logfile_exists {
244 let log_reader = std::fs::File::open(&log_path)?;
246 let mut update = bincode::deserialize_from(log_reader)?;
247 self.apply_update_in_memory(&mut update, true, |_| {})?;
248 } else {
249 self.current_change_id = 0;
250 }
251
252 if load_from_backup {
253 self.save_to(&location.join("current"))?;
255 let tmp_dir = tempfile::Builder::new()
257 .prefix("temporary-graphannis-backup")
258 .tempdir_in(location)?;
259 std::fs::remove_dir(tmp_dir.path())?;
261 std::fs::rename(&backup, tmp_dir.path())?;
262 tmp_dir.close()?;
264 }
265
266 Ok(())
267 }
268
269 pub fn save_to(&mut self, location: &Path) -> Result<()> {
271 self.ensure_loaded_all()?;
273 self.internal_save(&location.join("current"))
274 }
275
276 pub fn persist_to(&mut self, location: &Path) -> Result<()> {
278 self.location = Some(location.to_path_buf());
279 self.internal_save(&location.join("current"))
280 }
281
282 fn clear(&mut self) -> Result<()> {
285 self.node_annos = Box::new(crate::annostorage::inmemory::AnnoStorageImpl::new());
286 self.components.clear();
287 Ok(())
288 }
289
290 fn find_components_from_disk(&mut self, location: &Path) -> Result<()> {
291 self.components.clear();
292
293 for c in CT::all_component_types().into_iter() {
295 let cpath = PathBuf::from(location).join("gs").join(c.to_string());
296
297 if cpath.is_dir() {
298 for layer in cpath.read_dir()? {
300 let layer = layer?;
301 if layer.path().is_dir() {
302 let layer_file_name = layer.file_name();
304 let layer_name_from_file = layer_file_name.to_string_lossy();
305 let layer_name: SmartString = if layer_name_from_file == DEFAULT_EMPTY_LAYER
306 {
307 SmartString::default()
308 } else {
309 layer_name_from_file.into()
310 };
311 let empty_name_component =
312 Component::new(c.clone(), layer_name.clone(), SmartString::default());
313 {
314 let cfg_file = PathBuf::from(location)
315 .join(component_to_relative_path(&empty_name_component))
316 .join("impl.cfg");
317
318 if cfg_file.is_file() {
319 self.components.insert(empty_name_component.clone(), None);
320 debug!("Registered component {}", empty_name_component);
321 }
322 }
323 for name in layer.path().read_dir()? {
325 let name = name?;
326 let named_component = Component::new(
327 c.clone(),
328 layer_name.clone(),
329 name.file_name().to_string_lossy().into(),
330 );
331 let cfg_file = PathBuf::from(location)
332 .join(component_to_relative_path(&named_component))
333 .join("impl.cfg");
334
335 if cfg_file.is_file() {
336 self.components.insert(named_component.clone(), None);
337 debug!("Registered component {}", named_component);
338 }
339 }
340 }
341 }
342 }
343 } Ok(())
345 }
346
347 fn internal_save(&self, location: &Path) -> Result<()> {
348 let location = PathBuf::from(location);
349
350 std::fs::create_dir_all(&location)?;
351
352 self.node_annos.save_annotations_to(&location)?;
353
354 for (c, e) in &self.components {
355 if let Some(ref data) = *e {
356 let dir = PathBuf::from(&location).join(component_to_relative_path(c));
357 std::fs::create_dir_all(&dir)?;
358
359 let impl_name = data.serialization_id();
360 data.save_to(&dir)?;
361
362 let cfg_path = PathBuf::from(&dir).join("impl.cfg");
363 let mut f_cfg = std::fs::File::create(cfg_path)?;
364 f_cfg.write_all(impl_name.as_bytes())?;
365 }
366 }
367
368 if let Some(s) = &self.global_statistics {
370 let file_content = toml::to_string(s)?;
371 std::fs::write(location.join(GLOBAL_STATISTICS_FILE_NAME), file_content)?;
372 }
373
374 Ok(())
375 }
376
377 fn get_cached_node_id_from_name(
378 &self,
379 node_name: Cow<String>,
380 cache: &mut CLruCache<String, Option<NodeID>>,
381 ) -> Result<Option<NodeID>> {
382 if let Some(id) = cache.get(node_name.as_ref()) {
383 Ok(*id)
384 } else {
385 let id = self.node_annos.get_node_id_from_name(&node_name)?;
386 cache.put(node_name.to_string(), id);
387 Ok(id)
388 }
389 }
390
391 #[allow(clippy::cognitive_complexity)]
392 fn apply_update_in_memory<F>(
393 &mut self,
394 u: &mut GraphUpdate,
395 update_statistics: bool,
396 progress_callback: F,
397 ) -> Result<()>
398 where
399 F: Fn(&str),
400 {
401 let all_components = self.get_all_components(None, None);
402
403 let mut update_graph_index = ComponentType::init_update_graph_index(self)?;
404 let cache_size = NonZeroUsize::new(1_000).ok_or(GraphAnnisCoreError::ZeroCacheSize)?;
406 let mut node_id_cache = CLruCache::new(cache_size);
407 let total_nr_updates = u.len()?;
409 progress_callback(&format!("applying {} atomic updates", total_nr_updates));
410 for (nr_updates, update_event) in u.iter()?.enumerate() {
411 let (id, change) = update_event?;
412 trace!("applying event {:?}", &change);
413 ComponentType::before_update_event(&change, self, &mut update_graph_index)?;
414 match &change {
415 UpdateEvent::AddNode {
416 node_name,
417 node_type,
418 } => {
419 if !self.node_annos.has_node_name(node_name)? {
421 let new_node_id: NodeID =
422 if let Some(id) = self.node_annos.get_largest_item()? {
423 id + 1
424 } else {
425 0
426 };
427
428 let new_anno_name = Annotation {
429 key: NODE_NAME_KEY.as_ref().clone(),
430 val: node_name.into(),
431 };
432 let new_anno_type = Annotation {
433 key: NODE_TYPE_KEY.as_ref().clone(),
434 val: node_type.into(),
435 };
436
437 self.node_annos.insert(new_node_id, new_anno_name)?;
439 self.node_annos.insert(new_node_id, new_anno_type)?;
440
441 node_id_cache.put(node_name.clone(), Some(new_node_id));
443 }
444 }
445 UpdateEvent::DeleteNode { node_name } => {
446 if let Some(existing_node_id) = self.get_cached_node_id_from_name(
447 Cow::Borrowed(node_name),
448 &mut node_id_cache,
449 )? {
450 self.node_annos.remove_item(&existing_node_id)?;
452
453 for c in all_components.iter() {
455 if let Ok(gs) = self.get_or_create_writable(c) {
456 gs.delete_node(existing_node_id)?;
457 }
458 }
459
460 node_id_cache.put(node_name.clone(), None);
462 }
463 }
464 UpdateEvent::AddNodeLabel {
465 node_name,
466 anno_ns,
467 anno_name,
468 anno_value,
469 } => {
470 if let Some(existing_node_id) = self.get_cached_node_id_from_name(
471 Cow::Borrowed(node_name),
472 &mut node_id_cache,
473 )? {
474 let anno = Annotation {
475 key: AnnoKey {
476 ns: anno_ns.into(),
477 name: anno_name.into(),
478 },
479 val: anno_value.into(),
480 };
481 self.node_annos.insert(existing_node_id, anno)?;
482 }
483 }
484 UpdateEvent::DeleteNodeLabel {
485 node_name,
486 anno_ns,
487 anno_name,
488 } => {
489 if let Some(existing_node_id) = self.get_cached_node_id_from_name(
490 Cow::Borrowed(node_name),
491 &mut node_id_cache,
492 )? {
493 let key = AnnoKey {
494 ns: anno_ns.into(),
495 name: anno_name.into(),
496 };
497 self.node_annos
498 .remove_annotation_for_item(&existing_node_id, &key)?;
499 }
500 }
501 UpdateEvent::AddEdge {
502 source_node,
503 target_node,
504 layer,
505 component_type,
506 component_name,
507 } => {
508 let source = self.get_cached_node_id_from_name(
509 Cow::Borrowed(source_node),
510 &mut node_id_cache,
511 )?;
512 let target = self.get_cached_node_id_from_name(
513 Cow::Borrowed(target_node),
514 &mut node_id_cache,
515 )?;
516 if let (Some(source), Some(target)) = (source, target) {
518 if let Ok(ctype) = CT::from_str(component_type) {
519 let c = Component::new(ctype, layer.into(), component_name.into());
520 let gs = self.get_or_create_writable(&c)?;
521 gs.add_edge(Edge { source, target })?;
522 }
523 }
524 }
525 UpdateEvent::DeleteEdge {
526 source_node,
527 target_node,
528 layer,
529 component_type,
530 component_name,
531 } => {
532 let source = self.get_cached_node_id_from_name(
533 Cow::Borrowed(source_node),
534 &mut node_id_cache,
535 )?;
536 let target = self.get_cached_node_id_from_name(
537 Cow::Borrowed(target_node),
538 &mut node_id_cache,
539 )?;
540 if let (Some(source), Some(target)) = (source, target) {
541 if let Ok(ctype) = CT::from_str(component_type) {
542 let c = Component::new(ctype, layer.into(), component_name.into());
543
544 let gs = self.get_or_create_writable(&c)?;
545 gs.delete_edge(&Edge { source, target })?;
546 }
547 }
548 }
549 UpdateEvent::AddEdgeLabel {
550 source_node,
551 target_node,
552 layer,
553 component_type,
554 component_name,
555 anno_ns,
556 anno_name,
557 anno_value,
558 } => {
559 let source = self.get_cached_node_id_from_name(
560 Cow::Borrowed(source_node),
561 &mut node_id_cache,
562 )?;
563 let target = self.get_cached_node_id_from_name(
564 Cow::Borrowed(target_node),
565 &mut node_id_cache,
566 )?;
567 if let (Some(source), Some(target)) = (source, target) {
568 if let Ok(ctype) = CT::from_str(component_type) {
569 let c = Component::new(ctype, layer.into(), component_name.into());
570 let gs = self.get_or_create_writable(&c)?;
571 let e = Edge { source, target };
573 if gs.is_connected(source, target, 1, Included(1))? {
574 let anno = Annotation {
575 key: AnnoKey {
576 ns: anno_ns.into(),
577 name: anno_name.into(),
578 },
579 val: anno_value.into(),
580 };
581 gs.add_edge_annotation(e, anno)?;
582 }
583 }
584 }
585 }
586 UpdateEvent::DeleteEdgeLabel {
587 source_node,
588 target_node,
589 layer,
590 component_type,
591 component_name,
592 anno_ns,
593 anno_name,
594 } => {
595 let source = self.get_cached_node_id_from_name(
596 Cow::Borrowed(source_node),
597 &mut node_id_cache,
598 )?;
599 let target = self.get_cached_node_id_from_name(
600 Cow::Borrowed(target_node),
601 &mut node_id_cache,
602 )?;
603 if let (Some(source), Some(target)) = (source, target) {
604 if let Ok(ctype) = CT::from_str(component_type) {
605 let c = Component::new(ctype, layer.into(), component_name.into());
606 let gs = self.get_or_create_writable(&c)?;
607 let e = Edge { source, target };
609 if gs.is_connected(source, target, 1, Included(1))? {
610 let key = AnnoKey {
611 ns: anno_ns.into(),
612 name: anno_name.into(),
613 };
614 gs.delete_edge_annotation(&e, &key)?;
615 }
616 }
617 }
618 }
619 } ComponentType::after_update_event(change, self, &mut update_graph_index)?;
621 self.current_change_id = id;
622
623 if nr_updates > 0 && nr_updates % 100_000 == 0 {
624 let progress = ((nr_updates as f64) / (total_nr_updates as f64)) * 100.0;
626 progress_callback(&format!(
627 "applied {:.2}% of the atomic updates ({}/{})",
628 progress, nr_updates, total_nr_updates,
629 ));
630 }
631 } if update_statistics {
634 progress_callback("calculating all statistics");
635 self.calculate_all_statistics()?;
636 }
637
638 progress_callback("extending graph with model-specific index");
639 ComponentType::apply_update_graph_index(update_graph_index, self)?;
640
641 Ok(())
642 }
643
644 pub fn apply_update<F>(&mut self, u: &mut GraphUpdate, progress_callback: F) -> Result<()>
647 where
648 F: Fn(&str),
649 {
650 progress_callback("applying list of atomic updates");
651
652 self.ensure_loaded_all()?;
654
655 let result = self.apply_update_in_memory(u, true, &progress_callback);
656 progress_callback("memory updates completed, persisting updates to disk");
657 self.persist_updates(u, result, progress_callback)?;
658 Ok(())
659 }
660
661 pub fn apply_update_keep_statistics<F>(
664 &mut self,
665 u: &mut GraphUpdate,
666 progress_callback: F,
667 ) -> Result<()>
668 where
669 F: Fn(&str),
670 {
671 progress_callback("applying list of atomic updates");
672
673 self.ensure_loaded_all()?;
675
676 let result = self.apply_update_in_memory(u, false, &progress_callback);
677 progress_callback("memory updates completed, persisting updates to disk");
678 self.persist_updates(u, result, progress_callback)?;
679 Ok(())
680 }
681
682 fn persist_updates<F>(
683 &mut self,
684 u: &mut GraphUpdate,
685 apply_update_result: Result<()>,
686 progress_callback: F,
687 ) -> Result<()>
688 where
689 F: Fn(&str),
690 {
691 if let Some(location) = self.location.clone() {
692 trace!("output location for persisting updates is {:?}", location);
693 if apply_update_result.is_ok() {
694 let current_path = location.join("current");
695 std::fs::create_dir_all(¤t_path)?;
697
698 let log_path = current_path.join("update_log.bin");
700
701 let temporary_dir = tempfile::tempdir_in(¤t_path)?;
703 let mut temporary_disk_file = tempfile::NamedTempFile::new_in(&temporary_dir)?;
704
705 debug!("writing WAL update log to {:?}", temporary_disk_file.path());
706 bincode::serialize_into(temporary_disk_file.as_file(), &u)?;
707 temporary_disk_file.flush()?;
708 debug!("moving finished WAL update log to {:?}", &log_path);
709 temporary_disk_file.persist(&log_path)?;
711
712 progress_callback("finished writing WAL update log");
713 } else {
714 trace!(
715 "error occured while applying updates: {:?}",
716 &apply_update_result
717 );
718 self.open(&location)?;
720 self.ensure_loaded_all()?;
721 }
722 }
723
724 apply_update_result
725 }
726
727 pub fn background_sync_wal_updates(&self) -> Result<()> {
729 if let Some(ref location) = self.location {
732 let _lock = self.background_persistance.lock()?;
734
735 self.internal_save_with_backup(location)?;
736 }
737
738 Ok(())
739 }
740
741 fn internal_save_with_backup(&self, location: &Path) -> Result<()> {
745 let backup_location = location.join("backup");
752 let current_location = location.join("current");
753 if !backup_location.exists() {
754 std::fs::rename(¤t_location, &backup_location)?;
755 }
756
757 self.internal_save(¤t_location)?;
759
760 let tmp_dir = tempfile::Builder::new()
762 .prefix("temporary-graphannis-backup")
763 .tempdir_in(location)?;
764 std::fs::remove_dir(tmp_dir.path())?;
766 std::fs::rename(&backup_location, tmp_dir.path())?;
767 tmp_dir.close()?;
769 Ok(())
770 }
771
772 fn ensure_writeable(&mut self, c: &Component<CT>) -> Result<()> {
773 self.ensure_loaded(c)?;
774 if let Some(gs_opt) = self.components.get_mut(c) {
776 let gs = gs_opt
778 .as_mut()
779 .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
780 let is_writable = {
782 Arc::get_mut(gs)
783 .ok_or_else(|| {
784 GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string())
785 })?
786 .as_writeable()
787 .is_some()
788 };
789 if is_writable {
790 return Ok(());
791 }
792 } else {
793 return Ok(());
795 }
796
797 let readonly_gs = self
799 .components
800 .get(c)
801 .cloned()
802 .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
803 .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
804 let writable_gs = registry::create_writeable(self, Some(readonly_gs.as_ref()))?;
805 self.components.insert(c.to_owned(), Some(writable_gs));
806
807 Ok(())
808 }
809
810 pub fn calculate_all_statistics(&mut self) -> Result<()> {
812 self.ensure_loaded_all()?;
813
814 debug!("Calculating node statistics");
815 self.node_annos.calculate_statistics()?;
816 for c in self.get_all_components(None, None) {
817 debug!("Calculating statistics for component {}", &c);
818 self.calculate_component_statistics(&c)?;
819 }
820
821 debug!("Calculating global graph statistics");
822 CT::calculate_global_statistics(self)?;
823
824 Ok(())
825 }
826
827 pub fn calculate_component_statistics(&mut self, c: &Component<CT>) -> Result<()> {
829 let mut result: Result<()> = Ok(());
830 let mut entry = self
831 .components
832 .remove(c)
833 .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?;
834 if let Some(ref mut gs) = entry {
835 if let Some(gs_mut) = Arc::get_mut(gs) {
836 if let Some(writeable_gs) = gs_mut.as_writeable() {
838 writeable_gs.calculate_statistics()?;
839 }
840 } else {
841 result = Err(GraphAnnisCoreError::NonExclusiveComponentReference(
842 c.to_string(),
843 ));
844 }
845 }
846 self.components.insert(c.clone(), entry);
848 result
849 }
850
851 pub fn get_or_create_writable(
855 &mut self,
856 c: &Component<CT>,
857 ) -> Result<&mut dyn WriteableGraphStorage> {
858 if self.components.contains_key(c) {
859 self.ensure_writeable(c)?;
861 } else {
862 let w = registry::create_writeable(self, None)?;
863
864 self.components.insert(c.clone(), Some(w));
865 }
866
867 let entry: &mut Arc<dyn GraphStorage> = self
869 .components
870 .get_mut(c)
871 .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
872 .as_mut()
873 .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
874
875 let gs_mut_ref: &mut dyn GraphStorage = Arc::get_mut(entry)
876 .ok_or_else(|| GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string()))?;
877 let result = gs_mut_ref
878 .as_writeable()
879 .ok_or_else(|| GraphAnnisCoreError::ReadOnlyComponent(c.to_string()))?;
880 Ok(result)
881 }
882
883 pub fn is_loaded(&self, c: &Component<CT>) -> bool {
885 let entry: Option<&Option<Arc<dyn GraphStorage>>> = self.components.get(c);
886 if let Some(gs_opt) = entry {
887 if gs_opt.is_some() {
888 return true;
889 }
890 }
891 false
892 }
893
894 pub fn ensure_loaded_all(&mut self) -> Result<()> {
896 let mut components_to_load: Vec<_> = Vec::with_capacity(self.components.len());
897
898 for (c, gs) in &self.components {
900 if gs.is_none() {
901 components_to_load.push(c.clone());
902 }
903 }
904
905 self.ensure_loaded_parallel(&components_to_load)?;
906 Ok(())
907 }
908
909 pub fn ensure_loaded(&mut self, c: &Component<CT>) -> Result<()> {
911 if let Some(gs_opt) = self.components.get_mut(c) {
913 if gs_opt.is_none() {
915 let component_path = component_path(&self.location, c)
916 .ok_or(GraphAnnisCoreError::EmptyComponentPath)?;
917 debug!(
918 "loading component {} from {}",
919 c,
920 &component_path.to_string_lossy()
921 );
922 let component = load_component_from_disk(&component_path)?;
923 gs_opt.get_or_insert_with(|| component);
924 }
925 }
926 Ok(())
927 }
928
929 pub fn ensure_loaded_parallel(
934 &mut self,
935 components_to_load: &[Component<CT>],
936 ) -> Result<Vec<Component<CT>>> {
937 let components_to_load: Vec<_> = components_to_load
940 .iter()
941 .filter(|c| {
942 if let Some(e) = self.components.get(c) {
943 e.is_none()
944 } else {
945 false
946 }
947 })
948 .collect();
949
950 let loaded_components: Vec<(_, Result<Arc<dyn GraphStorage>>)> = components_to_load
952 .into_par_iter()
953 .map(|c| match component_path(&self.location, c) {
954 Some(cpath) => {
955 debug!(
956 "loading component in parallel {} from {}",
957 c,
958 &cpath.to_string_lossy()
959 );
960 (c, load_component_from_disk(&cpath))
961 }
962 None => (c, Err(GraphAnnisCoreError::EmptyComponentPath)),
963 })
964 .collect();
965
966 let mut result = Vec::with_capacity(loaded_components.len());
968 for (c, gs) in loaded_components {
969 let gs = gs?;
970 self.components.insert(c.clone(), Some(gs));
971 result.push(c.clone());
972 }
973 Ok(result)
974 }
975
976 pub fn optimize_impl(&mut self, disk_based: bool) -> Result<()> {
977 self.ensure_loaded_all()?;
978
979 if self.disk_based != disk_based {
980 self.disk_based = disk_based;
981
982 let mut new_node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
984 Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
985 } else {
986 Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
987 };
988
989 info!("copying node annotations");
991 for m in self
992 .node_annos
993 .exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Any)
994 {
995 let m = m?;
996 for anno in self.node_annos.get_annotations_for_item(&m.node)? {
997 new_node_annos.insert(m.node, anno)?;
998 }
999 }
1000 self.node_annos = new_node_annos;
1001 }
1002
1003 info!("re-calculating all statistics");
1004 self.calculate_all_statistics()?;
1005
1006 for c in self.get_all_components(None, None) {
1007 info!("optimizing implementation for component {}", &c);
1009 self.optimize_gs_impl(&c)?;
1010 }
1011 if let Some(location) = &self.location {
1012 info!("saving corpus to disk");
1013 self.internal_save_with_backup(location)?;
1014 }
1015 Ok(())
1016 }
1017
1018 pub fn optimize_gs_impl(&mut self, c: &Component<CT>) -> Result<()> {
1019 if let Some(gs) = self.get_graphstorage(c) {
1020 if let Some(stats) = gs.get_statistics() {
1021 let opt_info = registry::get_optimal_impl_heuristic(self, stats);
1022
1023 if opt_info.id != gs.serialization_id() {
1025 let mut new_gs = registry::create_from_info(&opt_info)?;
1026 let converted = if let Some(new_gs_mut) = Arc::get_mut(&mut new_gs) {
1027 info!(
1028 "converting component {} to implementation {}",
1029 c, opt_info.id,
1030 );
1031 new_gs_mut.copy(self.get_node_annos(), gs.as_ref())?;
1032 true
1033 } else {
1034 false
1035 };
1036 if converted {
1037 info!(
1039 "finished conversion of component {} to implementation {}",
1040 c, opt_info.id,
1041 );
1042 self.components.insert(c.clone(), Some(new_gs.clone()));
1043 }
1044 }
1045 }
1046 }
1047
1048 Ok(())
1049 }
1050
1051 pub fn get_graphstorage(&self, c: &Component<CT>) -> Option<Arc<dyn GraphStorage>> {
1053 let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1055 Some(entry.clone())
1056 }
1057
1058 pub fn get_graphstorage_as_ref<'a>(
1060 &'a self,
1061 c: &Component<CT>,
1062 ) -> Option<&'a dyn GraphStorage> {
1063 let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1065 Some(entry.as_ref())
1066 }
1067
1068 pub fn get_node_annos(&self) -> &dyn NodeAnnotationStorage {
1070 self.node_annos.as_ref()
1071 }
1072
1073 pub fn get_node_annos_mut(&mut self) -> &mut dyn NodeAnnotationStorage {
1075 self.node_annos.as_mut()
1076 }
1077
1078 pub fn get_all_components(&self, ctype: Option<CT>, name: Option<&str>) -> Vec<Component<CT>> {
1082 if let (Some(ctype), Some(name)) = (&ctype, name) {
1083 let mut result: Vec<_> = Vec::new();
1085 let ckey = Component::new(ctype.clone(), SmartString::default(), name.into());
1086
1087 for (c, _) in self.components.range(ckey..) {
1088 if c.name != name || &c.get_type() != ctype {
1089 break;
1090 }
1091 result.push(c.clone());
1092 }
1093 result
1094 } else if let Some(ctype) = &ctype {
1095 let mut result: Vec<_> = Vec::new();
1097 let ckey = Component::new(
1098 ctype.clone(),
1099 SmartString::default(),
1100 SmartString::default(),
1101 );
1102
1103 for (c, _) in self.components.range(ckey..) {
1104 if &c.get_type() != ctype {
1105 break;
1106 }
1107 result.push(c.clone());
1108 }
1109 result
1110 } else {
1111 let filtered_components = self
1113 .components
1114 .keys()
1115 .filter(move |c| {
1116 if let Some(ctype) = ctype.clone() {
1117 if ctype != c.get_type() {
1118 return false;
1119 }
1120 }
1121 if let Some(name) = name {
1122 if name != c.name {
1123 return false;
1124 }
1125 }
1126 true
1127 })
1128 .cloned();
1129 filtered_components.collect()
1130 }
1131 }
1132}
1133
1134#[cfg(test)]
1135mod tests;