graphannis_core/graph/
mod.rs

1pub mod serialization;
2pub mod storage;
3pub mod update;
4
5use crate::{
6    annostorage::{AnnotationStorage, NodeAnnotationStorage, ValueSearch},
7    errors::Result,
8    graph::storage::{registry, GraphStorage, WriteableGraphStorage},
9};
10use crate::{
11    errors::GraphAnnisCoreError,
12    types::{AnnoKey, Annotation, Component, ComponentType, Edge, NodeID},
13};
14use clru::CLruCache;
15use rayon::prelude::*;
16use smartstring::alias::String as SmartString;
17use std::io::prelude::*;
18use std::ops::Bound::Included;
19use std::path::{Path, PathBuf};
20use std::string::ToString;
21use std::{
22    borrow::Cow,
23    sync::{Arc, Mutex},
24};
25use std::{collections::BTreeMap, num::NonZeroUsize};
26use update::{GraphUpdate, UpdateEvent};
27
28pub const ANNIS_NS: &str = "annis";
29pub const DEFAULT_NS: &str = "default_ns";
30pub const NODE_NAME: &str = "node_name";
31pub const NODE_TYPE: &str = "node_type";
32pub const DEFAULT_EMPTY_LAYER: &str = "default_layer";
33
34const GLOBAL_STATISTICS_FILE_NAME: &str = "global_statistics.toml";
35
36lazy_static! {
37    pub static ref DEFAULT_ANNO_KEY: Arc<AnnoKey> = Arc::from(AnnoKey::default());
38    pub static ref NODE_NAME_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
39        ns: ANNIS_NS.into(),
40        name: NODE_NAME.into(),
41    });
42    /// Return an annotation key which is used for the special `annis::node_type` annotation which every node must have to mark its existence.
43    pub static ref NODE_TYPE_KEY: Arc<AnnoKey> = Arc::from(AnnoKey {
44        ns: ANNIS_NS.into(),
45        name: NODE_TYPE.into(),
46    });
47}
48
49/// A representation of a graph including node annotations and edges.
50/// Edges are partioned into components and each component is implemented by specialized graph storage implementation.
51///
52/// Graphs can have an optional location on the disk.
53/// In this case, changes to the graph via the [apply_update(...)](#method.apply_update) function are automatically persisted to this location.
54///
55pub struct Graph<CT: ComponentType> {
56    node_annos: Box<dyn NodeAnnotationStorage>,
57
58    location: Option<PathBuf>,
59
60    components: BTreeMap<Component<CT>, Option<Arc<dyn GraphStorage>>>,
61    current_change_id: u64,
62
63    background_persistance: Arc<Mutex<()>>,
64
65    pub global_statistics: Option<CT::GlobalStatistics>,
66
67    disk_based: bool,
68}
69
70fn load_component_from_disk(component_path: &Path) -> Result<Arc<dyn GraphStorage>> {
71    // load component into memory
72    let impl_path = PathBuf::from(component_path).join("impl.cfg");
73    let mut f_impl = std::fs::File::open(impl_path)?;
74    let mut impl_name = String::new();
75    f_impl.read_to_string(&mut impl_name)?;
76
77    let gs = registry::deserialize(&impl_name, component_path)?;
78
79    Ok(gs)
80}
81
82fn component_to_relative_path<CT: ComponentType>(c: &Component<CT>) -> PathBuf {
83    let mut p = PathBuf::new();
84    p.push("gs");
85    p.push(c.get_type().to_string());
86    p.push(if c.layer.is_empty() {
87        DEFAULT_EMPTY_LAYER
88    } else {
89        &c.layer
90    });
91    p.push(c.name.as_str());
92    p
93}
94
95fn component_path<CT: ComponentType>(
96    location: &Option<PathBuf>,
97    c: &Component<CT>,
98) -> Option<PathBuf> {
99    match location {
100        Some(ref loc) => {
101            let mut p = PathBuf::from(loc);
102            // Check if we need to load the component from the backup folder
103            let backup = loc.join("backup");
104            if backup.exists() {
105                p.push("backup");
106            } else {
107                p.push("current");
108            }
109            p.push(component_to_relative_path(c));
110            Some(p)
111        }
112        None => None,
113    }
114}
115
116impl<CT: ComponentType> Graph<CT> {
117    /// Create a new and empty instance without any location on the disk.
118    pub fn new(disk_based: bool) -> Result<Self> {
119        let node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
120            Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
121        } else {
122            Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
123        };
124
125        Ok(Graph {
126            node_annos,
127            components: BTreeMap::new(),
128            global_statistics: None,
129            location: None,
130
131            current_change_id: 0,
132
133            background_persistance: Arc::new(Mutex::new(())),
134
135            disk_based,
136        })
137    }
138
139    /// Create a new instance without any location on the disk but with the default graph storage components.
140    pub fn with_default_graphstorages(disk_based: bool) -> Result<Self> {
141        let mut db = Graph::new(disk_based)?;
142        for c in CT::default_components() {
143            db.get_or_create_writable(&c)?;
144        }
145        Ok(db)
146    }
147
148    /// Opens the graph from an external location.
149    /// All updates will be persisted to this location.
150    ///
151    /// * `location` - The path on the disk
152    pub fn open(&mut self, location: &Path) -> Result<()> {
153        debug!("Opening corpus from {}", location.to_string_lossy());
154        self.clear()?;
155        self.location = Some(location.to_path_buf());
156        self.internal_open(location)?;
157
158        Ok(())
159    }
160
161    /// Overwrites all content with the graph at the external location. Updates
162    /// will *not* be persisted to this location and any old location will be
163    /// cleared.
164    ///
165    /// * `location` - The path on the disk
166    pub fn import(&mut self, location: &Path) -> Result<()> {
167        debug!("Importing corpus from {}", location.to_string_lossy());
168        self.clear()?;
169        // Set the path temoporary to the actual path, so loading the components will not fail
170        self.location = Some(location.to_path_buf());
171        self.internal_open(location)?;
172        self.ensure_loaded_all()?;
173        // Unlink the location again
174        self.location = None;
175        Ok(())
176    }
177
178    /// Load the graph from an external location.
179    /// This sets the location of this instance to the given location.
180    ///
181    /// * `location` - The path on the disk
182    /// * `preload` - If `true`, all components are loaded from disk into main memory.
183    #[deprecated(note = "Please use `open` instead")]
184    pub fn load_from(&mut self, location: &Path, preload: bool) -> Result<()> {
185        self.open(location)?;
186        if preload {
187            self.ensure_loaded_all()?;
188        }
189        Ok(())
190    }
191
192    /// Internal helper function that loads the content from an external
193    /// location.
194    ///
195    /// It does not clear the current graph and does not alter the configured
196    /// location to persist the graph to.
197    fn internal_open(&mut self, location: &Path) -> Result<()> {
198        let backup = location.join("backup");
199
200        let mut load_from_backup = false;
201        let dir2load = if backup.exists() && backup.is_dir() {
202            load_from_backup = true;
203            backup.clone()
204        } else {
205            location.join("current")
206        };
207
208        // Get the global statistics if available
209        self.global_statistics = None;
210        let global_statistics_file = dir2load.join(GLOBAL_STATISTICS_FILE_NAME);
211        if global_statistics_file.exists() && global_statistics_file.is_file() {
212            let file_content = std::fs::read_to_string(global_statistics_file)?;
213            self.global_statistics = Some(toml::from_str(&file_content)?);
214        }
215
216        // Load the node annotations
217        let ondisk_subdirectory = dir2load.join(crate::annostorage::ondisk::SUBFOLDER_NAME);
218        if ondisk_subdirectory.exists() && ondisk_subdirectory.is_dir() {
219            self.disk_based = true;
220            // directly load the on disk storage from the given folder to avoid having a temporary directory
221            let node_annos_tmp =
222                crate::annostorage::ondisk::AnnoStorageImpl::new(Some(ondisk_subdirectory))?;
223            self.node_annos = Box::new(node_annos_tmp);
224        } else {
225            // assume a main memory implementation
226            self.disk_based = false;
227            let mut node_annos_tmp = crate::annostorage::inmemory::AnnoStorageImpl::new();
228            node_annos_tmp.load_annotations_from(&dir2load)?;
229            self.node_annos = Box::new(node_annos_tmp);
230        }
231
232        let log_path = dir2load.join("update_log.bin");
233
234        let logfile_exists = log_path.exists() && log_path.is_file();
235
236        self.find_components_from_disk(&dir2load)?;
237
238        // If backup is active or a write log exists, always  a pre-load to get the complete corpus.
239        if logfile_exists | load_from_backup {
240            self.ensure_loaded_all()?;
241        }
242
243        if logfile_exists {
244            // apply any outstanding log file updates
245            let log_reader = std::fs::File::open(&log_path)?;
246            let mut update = bincode::deserialize_from(log_reader)?;
247            self.apply_update_in_memory(&mut update, true, |_| {})?;
248        } else {
249            self.current_change_id = 0;
250        }
251
252        if load_from_backup {
253            // save the current corpus under the actual location
254            self.save_to(&location.join("current"))?;
255            // rename backup folder (renaming is atomic and deleting could leave an incomplete backup folder on disk)
256            let tmp_dir = tempfile::Builder::new()
257                .prefix("temporary-graphannis-backup")
258                .tempdir_in(location)?;
259            // the target directory is created and can cause issues on windows: delete it first
260            std::fs::remove_dir(tmp_dir.path())?;
261            std::fs::rename(&backup, tmp_dir.path())?;
262            // remove it after renaming it
263            tmp_dir.close()?;
264        }
265
266        Ok(())
267    }
268
269    /// Save the current database to a `location` on the disk, but do not remember this location.
270    pub fn save_to(&mut self, location: &Path) -> Result<()> {
271        // make sure all components are loaded, otherwise saving them does not make any sense
272        self.ensure_loaded_all()?;
273        self.internal_save(&location.join("current"))
274    }
275
276    /// Save the current database at a new `location` and remember it as new internal location.
277    pub fn persist_to(&mut self, location: &Path) -> Result<()> {
278        self.location = Some(location.to_path_buf());
279        self.internal_save(&location.join("current"))
280    }
281
282    /// Clear the graph content.
283    /// This removes all node annotations, edges and knowledge about components.
284    fn clear(&mut self) -> Result<()> {
285        self.node_annos = Box::new(crate::annostorage::inmemory::AnnoStorageImpl::new());
286        self.components.clear();
287        Ok(())
288    }
289
290    fn find_components_from_disk(&mut self, location: &Path) -> Result<()> {
291        self.components.clear();
292
293        // for all component types
294        for c in CT::all_component_types().into_iter() {
295            let cpath = PathBuf::from(location).join("gs").join(c.to_string());
296
297            if cpath.is_dir() {
298                // get all the namespaces/layers
299                for layer in cpath.read_dir()? {
300                    let layer = layer?;
301                    if layer.path().is_dir() {
302                        // try to load the component with the empty name
303                        let layer_file_name = layer.file_name();
304                        let layer_name_from_file = layer_file_name.to_string_lossy();
305                        let layer_name: SmartString = if layer_name_from_file == DEFAULT_EMPTY_LAYER
306                        {
307                            SmartString::default()
308                        } else {
309                            layer_name_from_file.into()
310                        };
311                        let empty_name_component =
312                            Component::new(c.clone(), layer_name.clone(), SmartString::default());
313                        {
314                            let cfg_file = PathBuf::from(location)
315                                .join(component_to_relative_path(&empty_name_component))
316                                .join("impl.cfg");
317
318                            if cfg_file.is_file() {
319                                self.components.insert(empty_name_component.clone(), None);
320                                debug!("Registered component {}", empty_name_component);
321                            }
322                        }
323                        // also load all named components
324                        for name in layer.path().read_dir()? {
325                            let name = name?;
326                            let named_component = Component::new(
327                                c.clone(),
328                                layer_name.clone(),
329                                name.file_name().to_string_lossy().into(),
330                            );
331                            let cfg_file = PathBuf::from(location)
332                                .join(component_to_relative_path(&named_component))
333                                .join("impl.cfg");
334
335                            if cfg_file.is_file() {
336                                self.components.insert(named_component.clone(), None);
337                                debug!("Registered component {}", named_component);
338                            }
339                        }
340                    }
341                }
342            }
343        } // end for all components
344        Ok(())
345    }
346
347    fn internal_save(&self, location: &Path) -> Result<()> {
348        let location = PathBuf::from(location);
349
350        std::fs::create_dir_all(&location)?;
351
352        self.node_annos.save_annotations_to(&location)?;
353
354        for (c, e) in &self.components {
355            if let Some(ref data) = *e {
356                let dir = PathBuf::from(&location).join(component_to_relative_path(c));
357                std::fs::create_dir_all(&dir)?;
358
359                let impl_name = data.serialization_id();
360                data.save_to(&dir)?;
361
362                let cfg_path = PathBuf::from(&dir).join("impl.cfg");
363                let mut f_cfg = std::fs::File::create(cfg_path)?;
364                f_cfg.write_all(impl_name.as_bytes())?;
365            }
366        }
367
368        // Save global statistics
369        if let Some(s) = &self.global_statistics {
370            let file_content = toml::to_string(s)?;
371            std::fs::write(location.join(GLOBAL_STATISTICS_FILE_NAME), file_content)?;
372        }
373
374        Ok(())
375    }
376
377    fn get_cached_node_id_from_name(
378        &self,
379        node_name: Cow<String>,
380        cache: &mut CLruCache<String, Option<NodeID>>,
381    ) -> Result<Option<NodeID>> {
382        if let Some(id) = cache.get(node_name.as_ref()) {
383            Ok(*id)
384        } else {
385            let id = self.node_annos.get_node_id_from_name(&node_name)?;
386            cache.put(node_name.to_string(), id);
387            Ok(id)
388        }
389    }
390
391    #[allow(clippy::cognitive_complexity)]
392    fn apply_update_in_memory<F>(
393        &mut self,
394        u: &mut GraphUpdate,
395        update_statistics: bool,
396        progress_callback: F,
397    ) -> Result<()>
398    where
399        F: Fn(&str),
400    {
401        let all_components = self.get_all_components(None, None);
402
403        let mut update_graph_index = ComponentType::init_update_graph_index(self)?;
404        // Cache the expensive mapping of node names to IDs
405        let cache_size = NonZeroUsize::new(1_000).ok_or(GraphAnnisCoreError::ZeroCacheSize)?;
406        let mut node_id_cache = CLruCache::new(cache_size);
407        // Iterate once over all changes in the same order as the updates have been added
408        let total_nr_updates = u.len()?;
409        progress_callback(&format!("applying {} atomic updates", total_nr_updates));
410        for (nr_updates, update_event) in u.iter()?.enumerate() {
411            let (id, change) = update_event?;
412            trace!("applying event {:?}", &change);
413            ComponentType::before_update_event(&change, self, &mut update_graph_index)?;
414            match &change {
415                UpdateEvent::AddNode {
416                    node_name,
417                    node_type,
418                } => {
419                    // only add node if it does not exist yet
420                    if !self.node_annos.has_node_name(node_name)? {
421                        let new_node_id: NodeID =
422                            if let Some(id) = self.node_annos.get_largest_item()? {
423                                id + 1
424                            } else {
425                                0
426                            };
427
428                        let new_anno_name = Annotation {
429                            key: NODE_NAME_KEY.as_ref().clone(),
430                            val: node_name.into(),
431                        };
432                        let new_anno_type = Annotation {
433                            key: NODE_TYPE_KEY.as_ref().clone(),
434                            val: node_type.into(),
435                        };
436
437                        // add the new node (with minimum labels)
438                        self.node_annos.insert(new_node_id, new_anno_name)?;
439                        self.node_annos.insert(new_node_id, new_anno_type)?;
440
441                        // update the internal cache
442                        node_id_cache.put(node_name.clone(), Some(new_node_id));
443                    }
444                }
445                UpdateEvent::DeleteNode { node_name } => {
446                    if let Some(existing_node_id) = self.get_cached_node_id_from_name(
447                        Cow::Borrowed(node_name),
448                        &mut node_id_cache,
449                    )? {
450                        // delete all annotations
451                        self.node_annos.remove_item(&existing_node_id)?;
452
453                        // delete all edges pointing to this node either as source or target
454                        for c in all_components.iter() {
455                            if let Ok(gs) = self.get_or_create_writable(c) {
456                                gs.delete_node(existing_node_id)?;
457                            }
458                        }
459
460                        // update the internal cache
461                        node_id_cache.put(node_name.clone(), None);
462                    }
463                }
464                UpdateEvent::AddNodeLabel {
465                    node_name,
466                    anno_ns,
467                    anno_name,
468                    anno_value,
469                } => {
470                    if let Some(existing_node_id) = self.get_cached_node_id_from_name(
471                        Cow::Borrowed(node_name),
472                        &mut node_id_cache,
473                    )? {
474                        let anno = Annotation {
475                            key: AnnoKey {
476                                ns: anno_ns.into(),
477                                name: anno_name.into(),
478                            },
479                            val: anno_value.into(),
480                        };
481                        self.node_annos.insert(existing_node_id, anno)?;
482                    }
483                }
484                UpdateEvent::DeleteNodeLabel {
485                    node_name,
486                    anno_ns,
487                    anno_name,
488                } => {
489                    if let Some(existing_node_id) = self.get_cached_node_id_from_name(
490                        Cow::Borrowed(node_name),
491                        &mut node_id_cache,
492                    )? {
493                        let key = AnnoKey {
494                            ns: anno_ns.into(),
495                            name: anno_name.into(),
496                        };
497                        self.node_annos
498                            .remove_annotation_for_item(&existing_node_id, &key)?;
499                    }
500                }
501                UpdateEvent::AddEdge {
502                    source_node,
503                    target_node,
504                    layer,
505                    component_type,
506                    component_name,
507                } => {
508                    let source = self.get_cached_node_id_from_name(
509                        Cow::Borrowed(source_node),
510                        &mut node_id_cache,
511                    )?;
512                    let target = self.get_cached_node_id_from_name(
513                        Cow::Borrowed(target_node),
514                        &mut node_id_cache,
515                    )?;
516                    // only add edge if both nodes already exist
517                    if let (Some(source), Some(target)) = (source, target) {
518                        if let Ok(ctype) = CT::from_str(component_type) {
519                            let c = Component::new(ctype, layer.into(), component_name.into());
520                            let gs = self.get_or_create_writable(&c)?;
521                            gs.add_edge(Edge { source, target })?;
522                        }
523                    }
524                }
525                UpdateEvent::DeleteEdge {
526                    source_node,
527                    target_node,
528                    layer,
529                    component_type,
530                    component_name,
531                } => {
532                    let source = self.get_cached_node_id_from_name(
533                        Cow::Borrowed(source_node),
534                        &mut node_id_cache,
535                    )?;
536                    let target = self.get_cached_node_id_from_name(
537                        Cow::Borrowed(target_node),
538                        &mut node_id_cache,
539                    )?;
540                    if let (Some(source), Some(target)) = (source, target) {
541                        if let Ok(ctype) = CT::from_str(component_type) {
542                            let c = Component::new(ctype, layer.into(), component_name.into());
543
544                            let gs = self.get_or_create_writable(&c)?;
545                            gs.delete_edge(&Edge { source, target })?;
546                        }
547                    }
548                }
549                UpdateEvent::AddEdgeLabel {
550                    source_node,
551                    target_node,
552                    layer,
553                    component_type,
554                    component_name,
555                    anno_ns,
556                    anno_name,
557                    anno_value,
558                } => {
559                    let source = self.get_cached_node_id_from_name(
560                        Cow::Borrowed(source_node),
561                        &mut node_id_cache,
562                    )?;
563                    let target = self.get_cached_node_id_from_name(
564                        Cow::Borrowed(target_node),
565                        &mut node_id_cache,
566                    )?;
567                    if let (Some(source), Some(target)) = (source, target) {
568                        if let Ok(ctype) = CT::from_str(component_type) {
569                            let c = Component::new(ctype, layer.into(), component_name.into());
570                            let gs = self.get_or_create_writable(&c)?;
571                            // only add label if the edge already exists
572                            let e = Edge { source, target };
573                            if gs.is_connected(source, target, 1, Included(1))? {
574                                let anno = Annotation {
575                                    key: AnnoKey {
576                                        ns: anno_ns.into(),
577                                        name: anno_name.into(),
578                                    },
579                                    val: anno_value.into(),
580                                };
581                                gs.add_edge_annotation(e, anno)?;
582                            }
583                        }
584                    }
585                }
586                UpdateEvent::DeleteEdgeLabel {
587                    source_node,
588                    target_node,
589                    layer,
590                    component_type,
591                    component_name,
592                    anno_ns,
593                    anno_name,
594                } => {
595                    let source = self.get_cached_node_id_from_name(
596                        Cow::Borrowed(source_node),
597                        &mut node_id_cache,
598                    )?;
599                    let target = self.get_cached_node_id_from_name(
600                        Cow::Borrowed(target_node),
601                        &mut node_id_cache,
602                    )?;
603                    if let (Some(source), Some(target)) = (source, target) {
604                        if let Ok(ctype) = CT::from_str(component_type) {
605                            let c = Component::new(ctype, layer.into(), component_name.into());
606                            let gs = self.get_or_create_writable(&c)?;
607                            // only add label if the edge already exists
608                            let e = Edge { source, target };
609                            if gs.is_connected(source, target, 1, Included(1))? {
610                                let key = AnnoKey {
611                                    ns: anno_ns.into(),
612                                    name: anno_name.into(),
613                                };
614                                gs.delete_edge_annotation(&e, &key)?;
615                            }
616                        }
617                    }
618                }
619            } // end match update entry type
620            ComponentType::after_update_event(change, self, &mut update_graph_index)?;
621            self.current_change_id = id;
622
623            if nr_updates > 0 && nr_updates % 100_000 == 0 {
624                // Get progress in percentage
625                let progress = ((nr_updates as f64) / (total_nr_updates as f64)) * 100.0;
626                progress_callback(&format!(
627                    "applied {:.2}% of the atomic updates ({}/{})",
628                    progress, nr_updates, total_nr_updates,
629                ));
630            }
631        } // end for each consistent update entry
632
633        if update_statistics {
634            progress_callback("calculating all statistics");
635            self.calculate_all_statistics()?;
636        }
637
638        progress_callback("extending graph with model-specific index");
639        ComponentType::apply_update_graph_index(update_graph_index, self)?;
640
641        Ok(())
642    }
643
644    /// Apply a sequence of updates (`u` parameter) to this graph.
645    /// If the graph has a location on the disk, the changes are persisted.
646    pub fn apply_update<F>(&mut self, u: &mut GraphUpdate, progress_callback: F) -> Result<()>
647    where
648        F: Fn(&str),
649    {
650        progress_callback("applying list of atomic updates");
651
652        // we have to make sure that the corpus is fully loaded (with all components) before we can apply the update.
653        self.ensure_loaded_all()?;
654
655        let result = self.apply_update_in_memory(u, true, &progress_callback);
656        progress_callback("memory updates completed, persisting updates to disk");
657        self.persist_updates(u, result, progress_callback)?;
658        Ok(())
659    }
660
661    /// Apply a sequence of updates (`u` parameter) to this graph but do not update the graph statistics.
662    /// If the graph has a location on the disk, the changes are persisted.
663    pub fn apply_update_keep_statistics<F>(
664        &mut self,
665        u: &mut GraphUpdate,
666        progress_callback: F,
667    ) -> Result<()>
668    where
669        F: Fn(&str),
670    {
671        progress_callback("applying list of atomic updates");
672
673        // we have to make sure that the corpus is fully loaded (with all components) before we can apply the update.
674        self.ensure_loaded_all()?;
675
676        let result = self.apply_update_in_memory(u, false, &progress_callback);
677        progress_callback("memory updates completed, persisting updates to disk");
678        self.persist_updates(u, result, progress_callback)?;
679        Ok(())
680    }
681
682    fn persist_updates<F>(
683        &mut self,
684        u: &mut GraphUpdate,
685        apply_update_result: Result<()>,
686        progress_callback: F,
687    ) -> Result<()>
688    where
689        F: Fn(&str),
690    {
691        if let Some(location) = self.location.clone() {
692            trace!("output location for persisting updates is {:?}", location);
693            if apply_update_result.is_ok() {
694                let current_path = location.join("current");
695                // make sure the output path exits
696                std::fs::create_dir_all(&current_path)?;
697
698                // If successfull write log
699                let log_path = current_path.join("update_log.bin");
700
701                // Create a temporary directory in the same file system as the output
702                let temporary_dir = tempfile::tempdir_in(&current_path)?;
703                let mut temporary_disk_file = tempfile::NamedTempFile::new_in(&temporary_dir)?;
704
705                debug!("writing WAL update log to {:?}", temporary_disk_file.path());
706                bincode::serialize_into(temporary_disk_file.as_file(), &u)?;
707                temporary_disk_file.flush()?;
708                debug!("moving finished WAL update log to {:?}", &log_path);
709                // Since the temporary file should be on the same file system, persisting/moving it should be an atomic operation
710                temporary_disk_file.persist(&log_path)?;
711
712                progress_callback("finished writing WAL update log");
713            } else {
714                trace!(
715                    "error occured while applying updates: {:?}",
716                    &apply_update_result
717                );
718                // load corpus from disk again
719                self.open(&location)?;
720                self.ensure_loaded_all()?;
721            }
722        }
723
724        apply_update_result
725    }
726
727    /// A function to persist the changes of a write-ahead-log update on the disk. Should be run in a background thread.
728    pub fn background_sync_wal_updates(&self) -> Result<()> {
729        // TODO: friendly abort any currently running thread
730
731        if let Some(ref location) = self.location {
732            // Acquire lock, so that only one thread can write background data at the same time
733            let _lock = self.background_persistance.lock()?;
734
735            self.internal_save_with_backup(location)?;
736        }
737
738        Ok(())
739    }
740
741    /// Save this graph to the given location using a temporary backup folder for the old graph.
742    /// The backup folder is used to achieve some atomicity in combination with the `load_from` logic,
743    // which will load the backup folder in case saving the corpus to the "current" location was aborted.
744    fn internal_save_with_backup(&self, location: &Path) -> Result<()> {
745        // Move the old corpus to the backup sub-folder. When the corpus is loaded again and there is backup folder
746        // the backup will be used instead of the original possible corrupted files.
747        // The current version is only the real one if no backup folder exists. If there is a backup folder
748        // there is nothing to do since the backup already contains the last consistent version.
749        // A sub-folder is used to ensure that all directories are on the same file system and moving (instead of copying)
750        // is possible.
751        let backup_location = location.join("backup");
752        let current_location = location.join("current");
753        if !backup_location.exists() {
754            std::fs::rename(&current_location, &backup_location)?;
755        }
756
757        // Save the complete corpus without the write log to the target location
758        self.internal_save(&current_location)?;
759
760        // rename backup folder (renaming is atomic and deleting could leave an incomplete backup folder on disk)
761        let tmp_dir = tempfile::Builder::new()
762            .prefix("temporary-graphannis-backup")
763            .tempdir_in(location)?;
764        // the target directory is created and can cause issues on windows: delete it first
765        std::fs::remove_dir(tmp_dir.path())?;
766        std::fs::rename(&backup_location, tmp_dir.path())?;
767        // remove it after renaming it, (since the new "current" folder was completely written)
768        tmp_dir.close()?;
769        Ok(())
770    }
771
772    fn ensure_writeable(&mut self, c: &Component<CT>) -> Result<()> {
773        self.ensure_loaded(c)?;
774        // Short path: exists and already writable
775        if let Some(gs_opt) = self.components.get_mut(c) {
776            // This should always work, since we just ensured the component is loaded
777            let gs = gs_opt
778                .as_mut()
779                .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
780            // copy to writable implementation if needed
781            let is_writable = {
782                Arc::get_mut(gs)
783                    .ok_or_else(|| {
784                        GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string())
785                    })?
786                    .as_writeable()
787                    .is_some()
788            };
789            if is_writable {
790                return Ok(());
791            }
792        } else {
793            // Component does not exist at all, we can abort here
794            return Ok(());
795        }
796
797        // Component does exist, but is not writable, replace with writeable implementation
798        let readonly_gs = self
799            .components
800            .get(c)
801            .cloned()
802            .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
803            .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
804        let writable_gs = registry::create_writeable(self, Some(readonly_gs.as_ref()))?;
805        self.components.insert(c.to_owned(), Some(writable_gs));
806
807        Ok(())
808    }
809
810    /// (Re-) calculate the internal statistics needed for estimating graph components and annotations.
811    pub fn calculate_all_statistics(&mut self) -> Result<()> {
812        self.ensure_loaded_all()?;
813
814        debug!("Calculating node statistics");
815        self.node_annos.calculate_statistics()?;
816        for c in self.get_all_components(None, None) {
817            debug!("Calculating statistics for component {}", &c);
818            self.calculate_component_statistics(&c)?;
819        }
820
821        debug!("Calculating global graph statistics");
822        CT::calculate_global_statistics(self)?;
823
824        Ok(())
825    }
826
827    /// Makes sure the statistics for the given component are up-to-date.
828    pub fn calculate_component_statistics(&mut self, c: &Component<CT>) -> Result<()> {
829        let mut result: Result<()> = Ok(());
830        let mut entry = self
831            .components
832            .remove(c)
833            .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?;
834        if let Some(ref mut gs) = entry {
835            if let Some(gs_mut) = Arc::get_mut(gs) {
836                // Since immutable graph storages can't change, only writable graph storage statistics need to be re-calculated
837                if let Some(writeable_gs) = gs_mut.as_writeable() {
838                    writeable_gs.calculate_statistics()?;
839                }
840            } else {
841                result = Err(GraphAnnisCoreError::NonExclusiveComponentReference(
842                    c.to_string(),
843                ));
844            }
845        }
846        // re-insert component entry
847        self.components.insert(c.clone(), entry);
848        result
849    }
850
851    /// Gets the the given component.
852    /// If the component does not exist yet, it creates a  new empty one.
853    /// If the existing component is non-writable, a writable copy of it is created and returned.
854    pub fn get_or_create_writable(
855        &mut self,
856        c: &Component<CT>,
857    ) -> Result<&mut dyn WriteableGraphStorage> {
858        if self.components.contains_key(c) {
859            // make sure the component is actually writable and loaded
860            self.ensure_writeable(c)?;
861        } else {
862            let w = registry::create_writeable(self, None)?;
863
864            self.components.insert(c.clone(), Some(w));
865        }
866
867        // get and return the reference to the entry
868        let entry: &mut Arc<dyn GraphStorage> = self
869            .components
870            .get_mut(c)
871            .ok_or_else(|| GraphAnnisCoreError::MissingComponent(c.to_string()))?
872            .as_mut()
873            .ok_or_else(|| GraphAnnisCoreError::ComponentNotLoaded(c.to_string()))?;
874
875        let gs_mut_ref: &mut dyn GraphStorage = Arc::get_mut(entry)
876            .ok_or_else(|| GraphAnnisCoreError::NonExclusiveComponentReference(c.to_string()))?;
877        let result = gs_mut_ref
878            .as_writeable()
879            .ok_or_else(|| GraphAnnisCoreError::ReadOnlyComponent(c.to_string()))?;
880        Ok(result)
881    }
882
883    /// Returns `true` if the graph storage for this specific component is loaded and ready to use.
884    pub fn is_loaded(&self, c: &Component<CT>) -> bool {
885        let entry: Option<&Option<Arc<dyn GraphStorage>>> = self.components.get(c);
886        if let Some(gs_opt) = entry {
887            if gs_opt.is_some() {
888                return true;
889            }
890        }
891        false
892    }
893
894    /// Ensure that the graph storages for all component are loaded and ready to use.
895    pub fn ensure_loaded_all(&mut self) -> Result<()> {
896        let mut components_to_load: Vec<_> = Vec::with_capacity(self.components.len());
897
898        // colllect all missing components
899        for (c, gs) in &self.components {
900            if gs.is_none() {
901                components_to_load.push(c.clone());
902            }
903        }
904
905        self.ensure_loaded_parallel(&components_to_load)?;
906        Ok(())
907    }
908
909    /// Ensure that the graph storage for a specific component is loaded and ready to use.
910    pub fn ensure_loaded(&mut self, c: &Component<CT>) -> Result<()> {
911        // We only load known components, so check the map if the entry exists
912        if let Some(gs_opt) = self.components.get_mut(c) {
913            // If this is none, the component is known but not loaded
914            if gs_opt.is_none() {
915                let component_path = component_path(&self.location, c)
916                    .ok_or(GraphAnnisCoreError::EmptyComponentPath)?;
917                debug!(
918                    "loading component {} from {}",
919                    c,
920                    &component_path.to_string_lossy()
921                );
922                let component = load_component_from_disk(&component_path)?;
923                gs_opt.get_or_insert_with(|| component);
924            }
925        }
926        Ok(())
927    }
928
929    /// Ensure that the graph storage for a the given component is loaded and ready to use.
930    /// Loading is done in parallel.
931    ///
932    /// Returns the list of actually loaded (and existing) components.
933    pub fn ensure_loaded_parallel(
934        &mut self,
935        components_to_load: &[Component<CT>],
936    ) -> Result<Vec<Component<CT>>> {
937        // We only load known components, so check the map if the entry exists
938        // and that is not loaded yet.
939        let components_to_load: Vec<_> = components_to_load
940            .iter()
941            .filter(|c| {
942                if let Some(e) = self.components.get(c) {
943                    e.is_none()
944                } else {
945                    false
946                }
947            })
948            .collect();
949
950        // load missing components in parallel
951        let loaded_components: Vec<(_, Result<Arc<dyn GraphStorage>>)> = components_to_load
952            .into_par_iter()
953            .map(|c| match component_path(&self.location, c) {
954                Some(cpath) => {
955                    debug!(
956                        "loading component in parallel {} from {}",
957                        c,
958                        &cpath.to_string_lossy()
959                    );
960                    (c, load_component_from_disk(&cpath))
961                }
962                None => (c, Err(GraphAnnisCoreError::EmptyComponentPath)),
963            })
964            .collect();
965
966        // insert all the loaded components
967        let mut result = Vec::with_capacity(loaded_components.len());
968        for (c, gs) in loaded_components {
969            let gs = gs?;
970            self.components.insert(c.clone(), Some(gs));
971            result.push(c.clone());
972        }
973        Ok(result)
974    }
975
976    pub fn optimize_impl(&mut self, disk_based: bool) -> Result<()> {
977        self.ensure_loaded_all()?;
978
979        if self.disk_based != disk_based {
980            self.disk_based = disk_based;
981
982            // Change the node annotation implementation
983            let mut new_node_annos: Box<dyn NodeAnnotationStorage> = if disk_based {
984                Box::new(crate::annostorage::ondisk::AnnoStorageImpl::new(None)?)
985            } else {
986                Box::new(crate::annostorage::inmemory::AnnoStorageImpl::<NodeID>::new())
987            };
988
989            // Copy all annotations for all nodes
990            info!("copying node annotations");
991            for m in self
992                .node_annos
993                .exact_anno_search(Some(ANNIS_NS), NODE_TYPE, ValueSearch::Any)
994            {
995                let m = m?;
996                for anno in self.node_annos.get_annotations_for_item(&m.node)? {
997                    new_node_annos.insert(m.node, anno)?;
998                }
999            }
1000            self.node_annos = new_node_annos;
1001        }
1002
1003        info!("re-calculating all statistics");
1004        self.calculate_all_statistics()?;
1005
1006        for c in self.get_all_components(None, None) {
1007            // Perform the optimization if necessary
1008            info!("optimizing implementation for component {}", &c);
1009            self.optimize_gs_impl(&c)?;
1010        }
1011        if let Some(location) = &self.location {
1012            info!("saving corpus to disk");
1013            self.internal_save_with_backup(location)?;
1014        }
1015        Ok(())
1016    }
1017
1018    pub fn optimize_gs_impl(&mut self, c: &Component<CT>) -> Result<()> {
1019        if let Some(gs) = self.get_graphstorage(c) {
1020            if let Some(stats) = gs.get_statistics() {
1021                let opt_info = registry::get_optimal_impl_heuristic(self, stats);
1022
1023                // convert if necessary
1024                if opt_info.id != gs.serialization_id() {
1025                    let mut new_gs = registry::create_from_info(&opt_info)?;
1026                    let converted = if let Some(new_gs_mut) = Arc::get_mut(&mut new_gs) {
1027                        info!(
1028                            "converting component {} to implementation {}",
1029                            c, opt_info.id,
1030                        );
1031                        new_gs_mut.copy(self.get_node_annos(), gs.as_ref())?;
1032                        true
1033                    } else {
1034                        false
1035                    };
1036                    if converted {
1037                        // insert into components map
1038                        info!(
1039                            "finished conversion of component {} to implementation {}",
1040                            c, opt_info.id,
1041                        );
1042                        self.components.insert(c.clone(), Some(new_gs.clone()));
1043                    }
1044                }
1045            }
1046        }
1047
1048        Ok(())
1049    }
1050
1051    /// Get a read-only graph storage copy for the given component `c`.
1052    pub fn get_graphstorage(&self, c: &Component<CT>) -> Option<Arc<dyn GraphStorage>> {
1053        // get and return the reference to the entry if loaded
1054        let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1055        Some(entry.clone())
1056    }
1057
1058    /// Get a read-only graph storage reference for the given component `c`.
1059    pub fn get_graphstorage_as_ref<'a>(
1060        &'a self,
1061        c: &Component<CT>,
1062    ) -> Option<&'a dyn GraphStorage> {
1063        // get and return the reference to the entry if loaded
1064        let entry: &Arc<dyn GraphStorage> = self.components.get(c)?.as_ref()?;
1065        Some(entry.as_ref())
1066    }
1067
1068    /// Get a read-only reference to the node annotations of this graph
1069    pub fn get_node_annos(&self) -> &dyn NodeAnnotationStorage {
1070        self.node_annos.as_ref()
1071    }
1072
1073    /// Get a mutable reference to the node annotations of this graph
1074    pub fn get_node_annos_mut(&mut self) -> &mut dyn NodeAnnotationStorage {
1075        self.node_annos.as_mut()
1076    }
1077
1078    /// Returns all components of the graph given an optional type (`ctype`) and `name`.
1079    /// This allows to filter which components to receive.
1080    /// If you want to retrieve all components, use `None` as value for both arguments.
1081    pub fn get_all_components(&self, ctype: Option<CT>, name: Option<&str>) -> Vec<Component<CT>> {
1082        if let (Some(ctype), Some(name)) = (&ctype, name) {
1083            // lookup component from sorted map
1084            let mut result: Vec<_> = Vec::new();
1085            let ckey = Component::new(ctype.clone(), SmartString::default(), name.into());
1086
1087            for (c, _) in self.components.range(ckey..) {
1088                if c.name != name || &c.get_type() != ctype {
1089                    break;
1090                }
1091                result.push(c.clone());
1092            }
1093            result
1094        } else if let Some(ctype) = &ctype {
1095            // lookup component from sorted map
1096            let mut result: Vec<_> = Vec::new();
1097            let ckey = Component::new(
1098                ctype.clone(),
1099                SmartString::default(),
1100                SmartString::default(),
1101            );
1102
1103            for (c, _) in self.components.range(ckey..) {
1104                if &c.get_type() != ctype {
1105                    break;
1106                }
1107                result.push(c.clone());
1108            }
1109            result
1110        } else {
1111            // filter all entries
1112            let filtered_components = self
1113                .components
1114                .keys()
1115                .filter(move |c| {
1116                    if let Some(ctype) = ctype.clone() {
1117                        if ctype != c.get_type() {
1118                            return false;
1119                        }
1120                    }
1121                    if let Some(name) = name {
1122                        if name != c.name {
1123                            return false;
1124                        }
1125                    }
1126                    true
1127                })
1128                .cloned();
1129            filtered_components.collect()
1130        }
1131    }
1132}
1133
1134#[cfg(test)]
1135mod tests;