graphannis_core/graph/
mod.rs

1pub mod serialization;
2pub mod storage;
3pub mod update;
4
5use crate::{
6    annostorage::{AnnotationStorage, NodeAnnotationStorage, ValueSearch},
7    errors::Result,
8    graph::storage::{GraphStorage, WriteableGraphStorage, registry},
9};
10use crate::{
11    errors::GraphAnnisCoreError,
12    types::{AnnoKey, Annotation, Component, ComponentType, Edge, NodeID},
13};
14use clru::CLruCache;
15use rayon::prelude::*;
16use std::ops::Bound::Included;
17use std::path::{Path, PathBuf};
18use std::string::ToString;
19use std::{
20    borrow::Cow,
21    sync::{Arc, Mutex},
22};
23use std::{collections::BTreeMap, num::NonZeroUsize};
24use std::{collections::BTreeSet, io::prelude::*};
25use update::{GraphUpdate, UpdateEvent};
26
27pub const ANNIS_NS: &str = "annis";
28pub const DEFAULT_NS: &str = "default_ns";
29pub const NODE_NAME: &str = "node_name";
30pub const NODE_TYPE: &str = "node_type";
31pub const DEFAULT_EMPTY_LAYER: &str = "default_layer";
32
33const GLOBAL_STATISTICS_FILE_NAME: &str = "global_statistics.toml";
34
35lazy_static! {
36    pub static ref DEFAULT_ANNO_KEY: Arc<AnnoKey> = Arc::from(AnnoKey::default());
37    pub static ref NODE_NAME_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
38        ns: ANNIS_NS.into(),
39        name: NODE_NAME.into(),
40    });
41    /// Return an annotation key which is used for the special `annis::node_type` annotation which every node must have to mark its existence.
42    pub static ref NODE_TYPE_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
43        ns: ANNIS_NS.into(),
44        name: NODE_TYPE.into(),
45    });
46}
47
48/// A representation of a graph including node annotations and edges.
49/// Edges are partioned into components and each component is implemented by specialized graph storage implementation.
50///
51/// Graphs can have an optional location on the disk.
52/// In this case, changes to the graph via the [apply_update(...)](#method.apply_update) function are automatically persisted to this location.
53///
54pub struct Graph<CT: ComponentType> {
55    node_annos: Box<dyn NodeAnnotationStorage>,
56
57    location: Option<PathBuf>,
58
59    components: BTreeMap<Component<CT>, Option<Arc<dyn GraphStorage>>>,
60    current_change_id: u64,
61
62    background_persistance: Arc<Mutex<()>>,
63
64    pub global_statistics: Option<CT::GlobalStatistics>,
65
66    disk_based: bool,
67}
68
69fn load_component_from_disk(component_path: &Path) -> Result<Arc<dyn GraphStorage>> {
70    // load component into memory
71    let impl_path = PathBuf::from(component_path).join("impl.cfg");
72    let mut f_impl = std::fs::File::open(impl_path)?;
73    let mut impl_name = String::new();
74    f_impl.read_to_string(&mut impl_name)?;
75
76    let gs = registry::deserialize(&impl_name, component_path)?;
77
78    Ok(gs)
79}
80
81fn component_to_relative_path<CT: ComponentType>(c: &Component<CT>) -> PathBuf {
82    let mut p = PathBuf::new();
83    p.push("gs");
84    p.push(c.get_type().to_string());
85    p.push(if c.layer.is_empty() {
86        DEFAULT_EMPTY_LAYER
87    } else {
88        &c.layer
89    });
90    p.push(c.name.as_str());
91    p
92}
93
94fn component_path<CT: ComponentType>(
95    location: &Option<PathBuf>,
96    c: &Component<CT>,
97) -> Option<PathBuf> {
98    match location {
99        Some(loc) => {
100            let mut p = PathBuf::from(loc);
101            // Check if we need to load the component from the backup folder
102            let backup = loc.join("backup");
103            if backup.exists() {
104                p.push("backup");
105            } else {
106                p.push("current");
107            }
108            p.push(component_to_relative_path(c));
109            Some(p)
110        }
111        None => None,
112    }
113}
114
115/// List all the components that belong to corpus in the given directory.
116pub fn find_components_from_disk<CT: ComponentType, P: AsRef<Path>>(
117    location: P,
118) -> Result<BTreeSet<Component<CT>>> {
119    let mut result = BTreeSet::new();
120    // for all component types
121    for c in CT::all_component_types().into_iter() {
122        let cpath = PathBuf::from(location.as_ref())
123            .join("gs")
124            .join(c.to_string());
125
126        if cpath.is_dir() {
127            // get all the namespaces/layers
128            for layer in cpath.read_dir()? {
129                let layer = layer?;
130                if layer.path().is_dir() {
131                    // try to load the component with the empty name
132                    let layer_file_name = layer.file_name();
133                    let layer_name_from_file = layer_file_name.to_string_lossy();
134                    let layer_name = if layer_name_from_file == DEFAULT_EMPTY_LAYER {
135                        String::default()
136                    } else {
137                        layer_name_from_file.into()
138                    };
139                    let empty_name_component =
140                        Component::new(c.clone(), layer_name.clone(), String::default());
141                    {
142                        let cfg_file = PathBuf::from(location.as_ref())
143                            .join(component_to_relative_path(&empty_name_component))
144                            .join("impl.cfg");
145
146                        if cfg_file.is_file() {
147                            result.insert(empty_name_component.clone());
148                            debug!("Registered component {}", empty_name_component);
149                        }
150                    }
151                    // also load all named components
152                    for name in layer.path().read_dir()? {
153                        let name = name?;
154                        let named_component = Component::new(
155                            c.clone(),
156                            layer_name.clone(),
157                            name.file_name().to_string_lossy().into(),
158                        );
159                        let cfg_file = PathBuf::from(location.as_ref())
160                            .join(component_to_relative_path(&named_component))
161                            .join("impl.cfg");
162
163                        if cfg_file.is_file() {
164                            result.insert(named_component.clone());
165                            debug!("Registered component {}", named_component);
166                        }
167                    }
168                }
169            }
170        }
171    } // end for all components
172    Ok(result)
173}
174
175impl<CT: ComponentType> Graph<CT> {
176    /// Create a new and empty instance without any location on the disk.
177    pub fn new(disk_based: bool) -> Result<Self> {
178        let node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
179            Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
180        } else {
181            Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
182        };
183
184        Ok(Graph {
185            node_annos,
186            components: BTreeMap::new(),
187            global_statistics: None,
188            location: None,
189
190            current_change_id: 0,
191
192            background_persistance: Arc::new(Mutex::new(())),
193
194            disk_based,
195        })
196    }
197
198    /// Create a new instance without any location on the disk but with the default graph storage components.
199    pub fn with_default_graphstorages(disk_based: bool) -> Result<Self> {
200        let mut db = Graph::new(disk_based)?;
201        for c in CT::default_components() {
202            db.get_or_create_writable(&c)?;
203        }
204        Ok(db)
205    }
206
207    /// Opens the graph from an external location.
208    /// All updates will be persisted to this location.
209    ///
210    /// * `location` - The path on the disk
211    pub fn open(&mut self, location: &Path) -> Result<()> {
212        debug!("Opening corpus from {}", location.to_string_lossy());
213        self.clear()?;
214        self.location = Some(location.to_path_buf());
215        self.internal_open(location)?;
216
217        Ok(())
218    }
219
220    /// Overwrites all content with the graph at the external location. Updates
221    /// will *not* be persisted to this location and any old location will be
222    /// cleared.
223    ///
224    /// * `location` - The path on the disk
225    pub fn import(&mut self, location: &Path) -> Result<()> {
226        debug!("Importing corpus from {}", location.to_string_lossy());
227        self.clear()?;
228        // Set the path temoporary to the actual path, so loading the components will not fail
229        self.location = Some(location.to_path_buf());
230        self.internal_open(location)?;
231        self.ensure_loaded_all()?;
232        // Unlink the location again
233        self.location = None;
234        Ok(())
235    }
236
237    /// Load the graph from an external location.
238    /// This sets the location of this instance to the given location.
239    ///
240    /// * `location` - The path on the disk
241    /// * `preload` - If `true`, all components are loaded from disk into main memory.
242    #[deprecated(note = "Please use `open` instead")]
243    pub fn load_from(&mut self, location: &Path, preload: bool) -> Result<()> {
244        self.open(location)?;
245        if preload {
246            self.ensure_loaded_all()?;
247        }
248        Ok(())
249    }
250
251    /// Internal helper function that loads the content from an external
252    /// location.
253    ///
254    /// It does not clear the current graph and does not alter the configured
255    /// location to persist the graph to.
256    fn internal_open(&mut self, location: &Path) -> Result<()> {
257        let backup = location.join("backup");
258
259        let mut load_from_backup = false;
260        let dir2load = if backup.exists() && backup.is_dir() {
261            load_from_backup = true;
262            backup.clone()
263        } else {
264            location.join("current")
265        };
266
267        // Get the global statistics if available
268        self.global_statistics = None;
269        let global_statistics_file = dir2load.join(GLOBAL_STATISTICS_FILE_NAME);
270        if global_statistics_file.exists() && global_statistics_file.is_file() {
271            let file_content = std::fs::read_to_string(global_statistics_file)?;
272            self.global_statistics = Some(toml::from_str(&file_content)?);
273        }
274
275        // Load the node annotations
276        let ondisk_subdirectory = dir2load.join(crate::annostorage::ondisk::SUBFOLDER_NAME);
277        if ondisk_subdirectory.exists() && ondisk_subdirectory.is_dir() {
278            self.disk_based = true;
279            // directly load the on disk storage from the given folder to avoid having a temporary directory
280            let node_annos_tmp =
281                crate::annostorage::ondisk::AnnoStorageImpl::new(Some(ondisk_subdirectory))?;
282            self.node_annos = Box::new(node_annos_tmp);
283        } else {
284            // assume a main memory implementation
285            self.disk_based = false;
286            let mut node_annos_tmp = crate::annostorage::inmemory::AnnoStorageImpl::new();
287            node_annos_tmp.load_annotations_from(&dir2load)?;
288            self.node_annos = Box::new(node_annos_tmp);
289        }
290
291        let log_path = dir2load.join("update_log.bin");
292
293        let logfile_exists = log_path.exists() && log_path.is_file();
294
295        self.components = find_components_from_disk(&dir2load)?
296            .into_iter()
297            .map(|c| (c, None))
298            .collect();
299
300        // If backup is active or a write log exists, always  a pre-load to get the complete corpus.
301        if logfile_exists | load_from_backup {
302            self.ensure_loaded_all()?;
303        }
304
305        if logfile_exists {
306            // apply any outstanding log file updates
307            let log_reader = std::fs::File::open(&log_path)?;
308            let mut update = bincode::deserialize_from(log_reader)?;
309            self.apply_update_in_memory(&mut update, true, |_| {})?;
310        } else {
311            self.current_change_id = 0;
312        }
313
314        if load_from_backup {
315            // save the current corpus under the actual location
316            self.save_to(&location.join("current"))?;
317            // rename backup folder (renaming is atomic and deleting could leave an incomplete backup folder on disk)
318            let tmp_dir = tempfile::Builder::new()
319                .prefix("temporary-graphannis-backup")
320                .tempdir_in(location)?;
321            // the target directory is created and can cause issues on windows: delete it first
322            std::fs::remove_dir(tmp_dir.path())?;
323            std::fs::rename(&backup, tmp_dir.path())?;
324            // remove it after renaming it
325            tmp_dir.close()?;
326        }
327
328        Ok(())
329    }
330
331    /// Save the current database to a `location` on the disk, but do not remember this location.
332    pub fn save_to(&mut self, location: &Path) -> Result<()> {
333        // make sure all components are loaded, otherwise saving them does not make any sense
334        self.ensure_loaded_all()?;
335        self.internal_save(&location.join("current"))
336    }
337
338    /// Save the current database at a new `location` and remember it as new internal location.
339    pub fn persist_to(&mut self, location: &Path) -> Result<()> {
340        self.location = Some(location.to_path_buf());
341        self.internal_save(&location.join("current"))
342    }
343
344    /// Clear the graph content.
345    /// This removes all node annotations, edges and knowledge about components.
346    fn clear(&mut self) -> Result<()> {
347        self.node_annos = Box::new(crate::annostorage::inmemory::AnnoStorageImpl::new());
348        self.components.clear();
349        Ok(())
350    }
351
352    fn internal_save(&self, location: &Path) -> Result<()> {
353        let location = PathBuf::from(location);
354
355        std::fs::create_dir_all(&location)?;
356
357        self.node_annos.save_annotations_to(&location)?;
358
359        for (c, e) in &self.components {
360            if let Some(ref data) = *e {
361                let dir = PathBuf::from(&location).join(component_to_relative_path(c));
362                std::fs::create_dir_all(&dir)?;
363
364                let impl_name = data.serialization_id();
365                data.save_to(&dir)?;
366
367                let cfg_path = PathBuf::from(&dir).join("impl.cfg");
368                let mut f_cfg = std::fs::File::create(cfg_path)?;
369                f_cfg.write_all(impl_name.as_bytes())?;
370            }
371        }
372
373        // Save global statistics
374        if let Some(s) = &self.global_statistics {
375            let file_content = toml::to_string(s)?;
376            std::fs::write(location.join(GLOBAL_STATISTICS_FILE_NAME), file_content)?;
377        }
378
379        Ok(())
380    }
381
382    fn get_cached_node_id_from_name(
383        &self,
384        node_name: Cow<str>,
385        cache: &mut CLruCache<String, Option<NodeID>>,
386    ) -> Result<Option<NodeID>> {
387        if let Some(id) = cache.get(node_name.as_ref()) {
388            Ok(*id)
389        } else {
390            let id = self.node_annos.get_node_id_from_name(&node_name)?;
391            cache.put(node_name.to_string(), id);
392            Ok(id)
393        }
394    }
395
396    #[allow(clippy::cognitive_complexity)]
397    fn apply_update_in_memory<F>(
398        &mut self,
399        u: &mut GraphUpdate,
400        update_statistics: bool,
401        progress_callback: F,
402    ) -> Result<()>
403    where
404        F: Fn(&str),
405    {
406        let all_components = self.get_all_components(None, None);
407
408        let mut update_graph_index = ComponentType::init_update_graph_index(self)?;
409        // Cache the expensive mapping of node names to IDs
410        let cache_size = NonZeroUsize::new(1_000).ok_or(GraphAnnisCoreError::ZeroCacheSize)?;
411        let mut node_id_cache = CLruCache::new(cache_size);
412        // Iterate once over all changes in the same order as the updates have been added
413        let total_nr_updates = u.len()?;
414        progress_callback(&format!("applying {} atomic updates", total_nr_updates));
415        for (nr_updates, update_event) in u.iter()?.enumerate() {
416            let (id, change) = update_event?;
417            trace!("applying event {:?}", &change);
418            ComponentType::before_update_event(&change, self, &mut update_graph_index)?;
419            match &change {
420                UpdateEvent::AddNode {
421                    node_name,
422                    node_type,
423                } => {
424                    // only add node if it does not exist yet
425                    if !self.node_annos.has_node_name(node_name)? {
426                        let new_node_id: NodeID =
427                            if let Some(id) = self.node_annos.get_largest_item()? {
428                                id + 1
429                            } else {
430                                0
431                            };
432
433                        let new_anno_name = Annotation {
434                            key: NODE_NAME_KEY.as_ref().clone(),
435                            val: node_name.into(),
436                        };
437                        let new_anno_type = Annotation {
438                            key: NODE_TYPE_KEY.as_ref().clone(),
439                            val: node_type.into(),
440                        };
441
442                        // add the new node (with minimum labels)
443                        self.node_annos.insert(new_node_id, new_anno_name)?;
444                        self.node_annos.insert(new_node_id, new_anno_type)?;
445
446                        // update the internal cache
447                        node_id_cache.put(node_name.clone(), Some(new_node_id));
448                    }
449                }
450                UpdateEvent::DeleteNode { node_name } => {
451                    if let Some(existing_node_id) = self.get_cached_node_id_from_name(
452                        Cow::Borrowed(node_name),
453                        &mut node_id_cache,
454                    )? {
455                        // delete all annotations
456                        self.node_annos.remove_item(&existing_node_id)?;
457
458                        // delete all edges pointing to this node either as source or target
459                        for c in all_components.iter() {
460                            if let Ok(gs) = self.get_or_create_writable(c) {
461                                gs.delete_node(existing_node_id)?;
462                            }
463                        }
464
465                        // update the internal cache
466                        node_id_cache.put(node_name.clone(), None);
467                    }
468                }
469                UpdateEvent::AddNodeLabel {
470                    node_name,
471                    anno_ns,
472                    anno_name,
473                    anno_value,
474                } => {
475                    if let Some(existing_node_id) = self.get_cached_node_id_from_name(
476                        Cow::Borrowed(node_name),
477                        &mut node_id_cache,
478                    )? {
479                        let anno = Annotation {
480                            key: AnnoKey {
481                                ns: anno_ns.into(),
482                                name: anno_name.into(),
483                            },
484                            val: anno_value.into(),
485                        };
486                        self.node_annos.insert(existing_node_id, anno)?;
487                    }
488                }
489                UpdateEvent::DeleteNodeLabel {
490                    node_name,
491                    anno_ns,
492                    anno_name,
493                } => {
494                    if let Some(existing_node_id) = self.get_cached_node_id_from_name(
495                        Cow::Borrowed(node_name),
496                        &mut node_id_cache,
497                    )? {
498                        let key = AnnoKey {
499                            ns: anno_ns.into(),
500                            name: anno_name.into(),
501                        };
502                        self.node_annos
503                            .remove_annotation_for_item(&existing_node_id, &key)?;
504                    }
505                }
506                UpdateEvent::AddEdge {
507                    source_node,
508                    target_node,
509                    layer,
510                    component_type,
511                    component_name,
512                } => {
513                    let source = self.get_cached_node_id_from_name(
514                        Cow::Borrowed(source_node),
515                        &mut node_id_cache,
516                    )?;
517                    let target = self.get_cached_node_id_from_name(
518                        Cow::Borrowed(target_node),
519                        &mut node_id_cache,
520                    )?;
521                    // only add edge if both nodes already exist
522                    if let (Some(source), Some(target)) = (source, target)
523                        && let Ok(ctype) = CT::from_str(component_type)
524                    {
525                        let c = Component::new(ctype, layer.into(), component_name.into());
526                        let gs = self.get_or_create_writable(&c)?;
527                        gs.add_edge(Edge { source, target })?;
528                    }
529                }
530                UpdateEvent::DeleteEdge {
531                    source_node,
532                    target_node,
533                    layer,
534                    component_type,
535                    component_name,
536                } => {
537                    let source = self.get_cached_node_id_from_name(
538                        Cow::Borrowed(source_node),
539                        &mut node_id_cache,
540                    )?;
541                    let target = self.get_cached_node_id_from_name(
542                        Cow::Borrowed(target_node),
543                        &mut node_id_cache,
544                    )?;
545                    if let (Some(source), Some(target)) = (source, target)
546                        && let Ok(ctype) = CT::from_str(component_type)
547                    {
548                        let c = Component::new(ctype, layer.into(), component_name.into());
549
550                        let gs = self.get_or_create_writable(&c)?;
551                        gs.delete_edge(&Edge { source, target })?;
552                    }
553                }
554                UpdateEvent::AddEdgeLabel {
555                    source_node,
556                    target_node,
557                    layer,
558                    component_type,
559                    component_name,
560                    anno_ns,
561                    anno_name,
562                    anno_value,
563                } => {
564                    let source = self.get_cached_node_id_from_name(
565                        Cow::Borrowed(source_node),
566                        &mut node_id_cache,
567                    )?;
568                    let target = self.get_cached_node_id_from_name(
569                        Cow::Borrowed(target_node),
570                        &mut node_id_cache,
571                    )?;
572                    if let (Some(source), Some(target)) = (source, target)
573                        && let Ok(ctype) = CT::from_str(component_type)
574                    {
575                        let c = Component::new(ctype, layer.into(), component_name.into());
576                        let gs = self.get_or_create_writable(&c)?;
577                        // only add label if the edge already exists
578                        let e = Edge { source, target };
579                        if gs.is_connected(source, target, 1, Included(1))? {
580                            let anno = Annotation {
581                                key: AnnoKey {
582                                    ns: anno_ns.into(),
583                                    name: anno_name.into(),
584                                },
585                                val: anno_value.into(),
586                            };
587                            gs.add_edge_annotation(e, anno)?;
588                        }
589                    }
590                }
591                UpdateEvent::DeleteEdgeLabel {
592                    source_node,
593                    target_node,
594                    layer,
595                    component_type,
596                    component_name,
597                    anno_ns,
598                    anno_name,
599                } => {
600                    let source = self.get_cached_node_id_from_name(
601                        Cow::Borrowed(source_node),
602                        &mut node_id_cache,
603                    )?;
604                    let target = self.get_cached_node_id_from_name(
605                        Cow::Borrowed(target_node),
606                        &mut node_id_cache,
607                    )?;
608                    if let (Some(source), Some(target)) = (source, target)
609                        && let Ok(ctype) = CT::from_str(component_type)
610                    {
611                        let c = Component::new(ctype, layer.into(), component_name.into());
612                        let gs = self.get_or_create_writable(&c)?;
613                        // only add label if the edge already exists
614                        let e = Edge { source, target };
615                        if gs.is_connected(source, target, 1, Included(1))? {
616                            let key = AnnoKey {
617                                ns: anno_ns.into(),
618                                name: anno_name.into(),
619                            };
620                            gs.delete_edge_annotation(&e, &key)?;
621                        }
622                    }
623                }
624            } // end match update entry type
625            ComponentType::after_update_event(change, self, &mut update_graph_index)?;
626            self.current_change_id = id;
627
628            if nr_updates > 0 && nr_updates % 100_000 == 0 {
629                // Get progress in percentage
630                let progress = ((nr_updates as f64) / (total_nr_updates as f64)) * 100.0;
631                progress_callback(&format!(
632                    "applied {:.2}% of the atomic updates ({}/{})",
633                    progress, nr_updates, total_nr_updates,
634                ));
635            }
636        } // end for each consistent update entry
637
638        if update_statistics {
639            progress_callback("calculating all statistics");
640            self.calculate_all_statistics()?;
641        }
642
643        progress_callback("extending graph with model-specific index");
644        ComponentType::apply_update_graph_index(update_graph_index, self)?;
645
646        Ok(())
647    }
648
649    /// Apply a sequence of updates (`u` parameter) to this graph.
650    /// If the graph has a location on the disk, the changes are persisted.
651    pub fn apply_update<F>(&mut self, u: &mut GraphUpdate, progress_callback: F) -> Result<()>
652    where
653        F: Fn(&str),
654    {
655        progress_callback("applying list of atomic updates");
656
657        // we have to make sure that the corpus is fully loaded (with all components) before we can apply the update.
658        self.ensure_loaded_all()?;
659
660        let result = self.apply_update_in_memory(u, true, &progress_callback);
661        progress_callback("memory updates completed, persisting updates to disk");
662        self.persist_updates(u, result, progress_callback)?;
663        Ok(())
664    }
665
666    /// Apply a sequence of updates (`u` parameter) to this graph but do not update the graph statistics.
667    /// If the graph has a location on the disk, the changes are persisted.
668    pub fn apply_update_keep_statistics<F>(
669        &mut self,
670        u: &mut GraphUpdate,
671        progress_callback: F,
672    ) -> Result<()>
673    where
674        F: Fn(&str),
675    {
676        progress_callback("applying list of atomic updates");
677
678        // we have to make sure that the corpus is fully loaded (with all components) before we can apply the update.
679        self.ensure_loaded_all()?;
680
681        let result = self.apply_update_in_memory(u, false, &progress_callback);
682        progress_callback("memory updates completed, persisting updates to disk");
683        self.persist_updates(u, result, progress_callback)?;
684        Ok(())
685    }
686
687    fn persist_updates<F>(
688        &mut self,
689        u: &mut GraphUpdate,
690        apply_update_result: Result<()>,
691        progress_callback: F,
692    ) -> Result<()>
693    where
694        F: Fn(&str),
695    {
696        if let Some(location) = self.location.clone() {
697            trace!("output location for persisting updates is {:?}", location);
698            if apply_update_result.is_ok() {
699                let current_path = location.join("current");
700                // make sure the output path exits
701                std::fs::create_dir_all(&current_path)?;
702
703                // If successfull write log
704                let log_path = current_path.join("update_log.bin");
705
706                // Create a temporary directory in the same file system as the output
707                let temporary_dir = tempfile::tempdir_in(&current_path)?;
708                let mut temporary_disk_file = tempfile::NamedTempFile::new_in(&temporary_dir)?;
709
710                debug!("writing WAL update log to {:?}", temporary_disk_file.path());
711                bincode::serialize_into(temporary_disk_file.as_file(), &u)?;
712                temporary_disk_file.flush()?;
713                debug!("moving finished WAL update log to {:?}", &log_path);
714                // Since the temporary file should be on the same file system, persisting/moving it should be an atomic operation
715                temporary_disk_file.persist(&log_path)?;
716
717                progress_callback("finished writing WAL update log");
718            } else {
719                trace!(
720                    "error occured while applying updates: {:?}",
721                    &apply_update_result
722                );
723                // load corpus from disk again
724                self.open(&location)?;
725                self.ensure_loaded_all()?;
726            }
727        }
728
729        apply_update_result
730    }
731
732    /// A function to persist the changes of a write-ahead-log update on the disk. Should be run in a background thread.
733    pub fn background_sync_wal_updates(&self) -> Result<()> {
734        // TODO: friendly abort any currently running thread
735
736        if let Some(ref location) = self.location {
737            // Acquire lock, so that only one thread can write background data at the same time
738            let _lock = self.background_persistance.lock()?;
739
740            self.internal_save_with_backup(location)?;
741        }
742
743        Ok(())
744    }
745
746    /// Save this graph to the given location using a temporary backup folder for the old graph.
747    /// The backup folder is used to achieve some atomicity in combination with the `load_from` logic,
748    // which will load the backup folder in case saving the corpus to the "current" location was aborted.
749    fn internal_save_with_backup(&self, location: &Path) -> Result<()> {
750        // Move the old corpus to the backup sub-folder. When the corpus is loaded again and there is backup folder
751        // the backup will be used instead of the original possible corrupted files.
752        // The current version is only the real one if no backup folder exists. If there is a backup folder
753        // there is nothing to do since the backup already contains the last consistent version.
754        // A sub-folder is used to ensure that all directories are on the same file system and moving (instead of copying)
755        // is possible.
756        let backup_location = location.join("backup");
757        let current_location = location.join("current");
758        if !backup_location.exists() {
759            std::fs::rename(&current_location, &backup_location)?;
760        }
761
762        // Save the complete corpus without the write log to the target location
763        self.internal_save(&current_location)?;
764
765        // rename backup folder (renaming is atomic and deleting could leave an incomplete backup folder on disk)
766        let tmp_dir = tempfile::Builder::new()
767            .prefix("temporary-graphannis-backup")
768            .tempdir_in(location)?;
769        // the target directory is created and can cause issues on windows: delete it first
770        std::fs::remove_dir(tmp_dir.path())?;
771        std::fs::rename(&backup_location, tmp_dir.path())?;
772        // remove it after renaming it, (since the new "current" folder was completely written)
773        tmp_dir.close()?;
774        Ok(())
775    }
776
777    fn ensure_writeable(&mut self, c: &Component<CT>) -> Result<()> {
778        self.ensure_loaded(c)?;
779        // Short path: exists and already writable
780        if let Some(gs_opt) = self.components.get_mut(c) {
781            // This should always work, since we just ensured the component is loaded
782            let gs = gs_opt
783                .as_mut()
784                .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
785            // copy to writable implementation if needed
786            let is_writable = {
787                Arc::get_mut(gs)
788                    .ok_or_else(|| {
789                        GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string())
790                    })?
791                    .as_writeable()
792                    .is_some()
793            };
794            if is_writable {
795                return Ok(());
796            }
797        } else {
798            // Component does not exist at all, we can abort here
799            return Ok(());
800        }
801
802        // Component does exist, but is not writable, replace with writeable implementation
803        let readonly_gs = self
804            .components
805            .get(c)
806            .cloned()
807            .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
808            .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
809        let writable_gs = registry::create_writeable(self, Some(readonly_gs.as_ref()))?;
810        self.components.insert(c.to_owned(), Some(writable_gs));
811
812        Ok(())
813    }
814
815    /// (Re-) calculate the internal statistics needed for estimating graph components and annotations.
816    pub fn calculate_all_statistics(&mut self) -> Result<()> {
817        self.ensure_loaded_all()?;
818
819        debug!("Calculating node statistics");
820        self.node_annos.calculate_statistics()?;
821        for c in self.get_all_components(None, None) {
822            debug!("Calculating statistics for component {}", &c);
823            self.calculate_component_statistics(&c)?;
824        }
825
826        debug!("Calculating global graph statistics");
827        CT::calculate_global_statistics(self)?;
828
829        Ok(())
830    }
831
832    /// Makes sure the statistics for the given component are up-to-date.
833    pub fn calculate_component_statistics(&mut self, c: &Component<CT>) -> Result<()> {
834        let mut result: Result<()> = Ok(());
835        let mut entry = self
836            .components
837            .remove(c)
838            .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?;
839        if let Some(ref mut gs) = entry {
840            if let Some(gs_mut) = Arc::get_mut(gs) {
841                // Since immutable graph storages can't change, only writable graph storage statistics need to be re-calculated
842                if let Some(writeable_gs) = gs_mut.as_writeable() {
843                    writeable_gs.calculate_statistics()?;
844                }
845            } else {
846                result = Err(GraphAnnisCoreError::NonExclusiveComponentReference(
847                    c.to_string(),
848                ));
849            }
850        }
851        // re-insert component entry
852        self.components.insert(c.clone(), entry);
853        result
854    }
855
856    /// Gets the the given component.
857    /// If the component does not exist yet, it creates a  new empty one.
858    /// If the existing component is non-writable, a writable copy of it is created and returned.
859    pub fn get_or_create_writable(
860        &mut self,
861        c: &Component<CT>,
862    ) -> Result<&mut dyn WriteableGraphStorage> {
863        if self.components.contains_key(c) {
864            // make sure the component is actually writable and loaded
865            self.ensure_writeable(c)?;
866        } else {
867            let w = registry::create_writeable(self, None)?;
868
869            self.components.insert(c.clone(), Some(w));
870        }
871
872        // get and return the reference to the entry
873        let entry: &mut Arc<dyn GraphStorage> = self
874            .components
875            .get_mut(c)
876            .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
877            .as_mut()
878            .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
879
880        let gs_mut_ref: &mut dyn GraphStorage = Arc::get_mut(entry)
881            .ok_or_else(|| GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string()))?;
882        let result = gs_mut_ref
883            .as_writeable()
884            .ok_or_else(|| GraphAnnisCoreError::ReadOnlyComponent(c.to_string()))?;
885        Ok(result)
886    }
887
888    /// Returns `true` if the graph storage for this specific component is loaded and ready to use.
889    pub fn is_loaded(&self, c: &Component<CT>) -> bool {
890        let entry: Option<&Option<Arc<dyn GraphStorage>>> = self.components.get(c);
891        if let Some(gs_opt) = entry
892            && gs_opt.is_some()
893        {
894            return true;
895        }
896        false
897    }
898
899    /// Ensure that the graph storages for all component are loaded and ready to use.
900    pub fn ensure_loaded_all(&mut self) -> Result<()> {
901        let mut components_to_load: Vec<_> = Vec::with_capacity(self.components.len());
902
903        // colllect all missing components
904        for (c, gs) in &self.components {
905            if gs.is_none() {
906                components_to_load.push(c.clone());
907            }
908        }
909
910        self.ensure_loaded_parallel(&components_to_load)?;
911        Ok(())
912    }
913
914    /// Ensure that the graph storage for a specific component is loaded and ready to use.
915    pub fn ensure_loaded(&mut self, c: &Component<CT>) -> Result<()> {
916        // We only load known components, so check the map if the entry exists
917        if let Some(gs_opt) = self.components.get_mut(c) {
918            // If this is none, the component is known but not loaded
919            if gs_opt.is_none() {
920                let component_path = component_path(&self.location, c)
921                    .ok_or(GraphAnnisCoreError::EmptyComponentPath)?;
922                debug!(
923                    "loading component {} from {}",
924                    c,
925                    &component_path.to_string_lossy()
926                );
927                let component = load_component_from_disk(&component_path)?;
928                gs_opt.get_or_insert_with(|| component);
929            }
930        }
931        Ok(())
932    }
933
934    /// Ensure that the graph storage for a the given component is loaded and ready to use.
935    /// Loading is done in parallel.
936    ///
937    /// Returns the list of actually loaded (and existing) components.
938    pub fn ensure_loaded_parallel(
939        &mut self,
940        components_to_load: &[Component<CT>],
941    ) -> Result<Vec<Component<CT>>> {
942        // We only load known components, so check the map if the entry exists
943        // and that is not loaded yet.
944        let components_to_load: Vec<_> = components_to_load
945            .iter()
946            .filter(|c| {
947                if let Some(e) = self.components.get(c) {
948                    e.is_none()
949                } else {
950                    false
951                }
952            })
953            .collect();
954
955        // load missing components in parallel
956        let loaded_components: Vec<(_, Result<Arc<dyn GraphStorage>>)> = components_to_load
957            .into_par_iter()
958            .map(|c| match component_path(&self.location, c) {
959                Some(cpath) => {
960                    debug!(
961                        "loading component in parallel {} from {}",
962                        c,
963                        &cpath.to_string_lossy()
964                    );
965                    (c, load_component_from_disk(&cpath))
966                }
967                None => (c, Err(GraphAnnisCoreError::EmptyComponentPath)),
968            })
969            .collect();
970
971        // insert all the loaded components
972        let mut result = Vec::with_capacity(loaded_components.len());
973        for (c, gs) in loaded_components {
974            let gs = gs?;
975            self.components.insert(c.clone(), Some(gs));
976            result.push(c.clone());
977        }
978        Ok(result)
979    }
980
981    pub fn optimize_impl(&mut self, disk_based: bool) -> Result<()> {
982        self.ensure_loaded_all()?;
983
984        if self.disk_based != disk_based {
985            self.disk_based = disk_based;
986
987            // Change the node annotation implementation
988            let mut new_node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
989                Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
990            } else {
991                Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
992            };
993
994            // Copy all annotations for all nodes
995            info!("copying node annotations");
996            for m in self
997                .node_annos
998                .exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Any)
999            {
1000                let m = m?;
1001                for anno in self.node_annos.get_annotations_for_item(&m.node)? {
1002                    new_node_annos.insert(m.node, anno)?;
1003                }
1004            }
1005            self.node_annos = new_node_annos;
1006        }
1007
1008        info!("re-calculating all statistics");
1009        self.calculate_all_statistics()?;
1010
1011        for c in self.get_all_components(None, None) {
1012            // Perform the optimization if necessary
1013            info!("optimizing implementation for component {}", &c);
1014            self.optimize_gs_impl(&c)?;
1015        }
1016        if let Some(location) = &self.location {
1017            info!("saving corpus to disk");
1018            self.internal_save_with_backup(location)?;
1019        }
1020        Ok(())
1021    }
1022
1023    pub fn optimize_gs_impl(&mut self, c: &Component<CT>) -> Result<()> {
1024        if let Some(gs) = self.get_graphstorage(c)
1025            && let Some(stats) = gs.get_statistics()
1026        {
1027            let opt_info = registry::get_optimal_impl_heuristic(self, stats);
1028
1029            // convert if necessary
1030            if opt_info.id != gs.serialization_id() {
1031                let mut new_gs = registry::create_from_info(&opt_info)?;
1032                let converted = if let Some(new_gs_mut) = Arc::get_mut(&mut new_gs) {
1033                    info!(
1034                        "converting component {} to implementation {}",
1035                        c, opt_info.id,
1036                    );
1037                    new_gs_mut.copy(self.get_node_annos(), gs.as_ref())?;
1038                    true
1039                } else {
1040                    false
1041                };
1042                if converted {
1043                    // insert into components map
1044                    info!(
1045                        "finished conversion of component {} to implementation {}",
1046                        c, opt_info.id,
1047                    );
1048                    self.components.insert(c.clone(), Some(new_gs.clone()));
1049                }
1050            }
1051        }
1052
1053        Ok(())
1054    }
1055
1056    /// Get a read-only graph storage copy for the given component `c`.
1057    pub fn get_graphstorage(&self, c: &Component<CT>) -> Option<Arc<dyn GraphStorage>> {
1058        // get and return the reference to the entry if loaded
1059        let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1060        Some(entry.clone())
1061    }
1062
1063    /// Get a read-only graph storage reference for the given component `c`.
1064    pub fn get_graphstorage_as_ref<'a>(
1065        &'a self,
1066        c: &Component<CT>,
1067    ) -> Option<&'a dyn GraphStorage> {
1068        // get and return the reference to the entry if loaded
1069        let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1070        Some(entry.as_ref())
1071    }
1072
1073    /// Get a read-only reference to the node annotations of this graph
1074    pub fn get_node_annos(&self) -> &dyn NodeAnnotationStorage {
1075        self.node_annos.as_ref()
1076    }
1077
1078    /// Get a mutable reference to the node annotations of this graph
1079    pub fn get_node_annos_mut(&mut self) -> &mut dyn NodeAnnotationStorage {
1080        self.node_annos.as_mut()
1081    }
1082
1083    /// Returns all components of the graph given an optional type (`ctype`) and `name`.
1084    /// This allows to filter which components to receive.
1085    /// If you want to retrieve all components, use `None` as value for both arguments.
1086    pub fn get_all_components(&self, ctype: Option<CT>, name: Option<&str>) -> Vec<Component<CT>> {
1087        if let (Some(ctype), Some(name)) = (&ctype, name) {
1088            // lookup component from sorted map
1089            let mut result: Vec<_> = Vec::new();
1090            let ckey = Component::new(ctype.clone(), String::default(), name.into());
1091
1092            for (c, _) in self.components.range(ckey..) {
1093                if c.name != name || &c.get_type() != ctype {
1094                    break;
1095                }
1096                result.push(c.clone());
1097            }
1098            result
1099        } else if let Some(ctype) = &ctype {
1100            // lookup component from sorted map
1101            let mut result: Vec<_> = Vec::new();
1102            let ckey = Component::new(ctype.clone(), String::default(), String::default());
1103
1104            for (c, _) in self.components.range(ckey..) {
1105                if &c.get_type() != ctype {
1106                    break;
1107                }
1108                result.push(c.clone());
1109            }
1110            result
1111        } else {
1112            // filter all entries
1113            let filtered_components = self
1114                .components
1115                .keys()
1116                .filter(move |c| {
1117                    if let Some(ctype) = ctype.clone()
1118                        && ctype != c.get_type()
1119                    {
1120                        return false;
1121                    }
1122                    if let Some(name) = name
1123                        && name != c.name
1124                    {
1125                        return false;
1126                    }
1127                    true
1128                })
1129                .cloned();
1130            filtered_components.collect()
1131        }
1132    }
1133}
1134
1135#[cfg(test)]
1136mod tests;