cellular_raza_core/storage/
concepts.rs

1use std::collections::{BTreeMap, HashMap};
2use std::error::Error;
3use std::fmt::Display;
4
5use serde::{Deserialize, Serialize};
6use uniquevec::UniqueVec;
7
8#[cfg(feature = "tracing")]
9use tracing::instrument;
10
11use super::memory_storage::MemoryStorageInterface;
12use super::ron::RonStorageInterface;
13use super::serde_json::JsonStorageInterface;
14use super::sled_database::SledStorageInterface;
15
16/// Error related to storing and reading elements
17#[derive(Debug)]
18pub enum StorageError {
19    /// Error related to File Io operations.
20    IoError(std::io::Error),
21    /// Occurs during parsing of json structs.
22    SerdeJsonError(serde_json::Error),
23    /// Generic error related to serialization in the [ron] crate.
24    RonError(ron::Error),
25    /// Generic error related to deserialization in the [ron] crate.
26    RonSpannedError(ron::error::SpannedError),
27    /// Generic error related to the [sled] database.
28    SledError(sled::Error),
29    /// Generic serialization error thrown by the [bincode] library.
30    SerializeError(Box<bincode::ErrorKind>),
31    /// Initialization error mainly used for initialization of databases such as [sled].
32    InitError(String),
33    /// Error when parsing file/folder names.
34    ParseIntError(std::num::ParseIntError),
35    /// Generic Utf8 error.
36    Utf8Error(std::str::Utf8Error),
37}
38
39impl From<serde_json::Error> for StorageError {
40    fn from(err: serde_json::Error) -> Self {
41        StorageError::SerdeJsonError(err)
42    }
43}
44
45impl From<ron::error::SpannedError> for StorageError {
46    fn from(err: ron::error::SpannedError) -> Self {
47        StorageError::RonSpannedError(err)
48    }
49}
50
51impl From<ron::Error> for StorageError {
52    fn from(err: ron::Error) -> Self {
53        StorageError::RonError(err)
54    }
55}
56
57impl From<sled::Error> for StorageError {
58    fn from(err: sled::Error) -> Self {
59        StorageError::SledError(err)
60    }
61}
62
63impl From<Box<bincode::ErrorKind>> for StorageError {
64    fn from(err: Box<bincode::ErrorKind>) -> Self {
65        StorageError::SerializeError(err)
66    }
67}
68
69impl From<std::io::Error> for StorageError {
70    fn from(err: std::io::Error) -> Self {
71        StorageError::IoError(err)
72    }
73}
74
75impl From<std::str::Utf8Error> for StorageError {
76    fn from(err: std::str::Utf8Error) -> Self {
77        StorageError::Utf8Error(err)
78    }
79}
80
81impl From<std::num::ParseIntError> for StorageError {
82    fn from(err: std::num::ParseIntError) -> Self {
83        StorageError::ParseIntError(err)
84    }
85}
86
87impl Display for StorageError {
88    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
89        match self {
90            StorageError::SerdeJsonError(message) => write!(f, "{}", message),
91            StorageError::RonError(message) => write!(f, "{}", message),
92            StorageError::RonSpannedError(message) => write!(f, "{}", message),
93            StorageError::SledError(message) => write!(f, "{}", message),
94            StorageError::SerializeError(message) => write!(f, "{}", message),
95            StorageError::IoError(message) => write!(f, "{}", message),
96            StorageError::InitError(message) => write!(f, "{}", message),
97            StorageError::Utf8Error(message) => write!(f, "{}", message),
98            StorageError::ParseIntError(message) => write!(f, "{}", message),
99        }
100    }
101}
102
103impl Error for StorageError {}
104
105/// Define how to store results of the simulation.
106///
107/// We currently support saving results in a [sled] database, or as a json file by using
108/// [serde_json].
109#[cfg_attr(feature = "pyo3", pyo3::pyclass)]
110#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
111pub enum StorageOption {
112    /// Save results as [sled] database.
113    Sled,
114    /// Save results as [sled] database but remove them when dropping the struct
115    SledTemp,
116    /// Save results as [json](https://www.json.org/json-en.html) file.
117    SerdeJson,
118    /// Store results in the [ron] file format specifically designed for Rust structs.
119    /// This format guarantees round-trips `Rust -> Ron -> Rust` and is thus preferred together
120    /// with the well-established [StorageOption::SerdeJson] format.
121    Ron,
122    /// A [std::collections::HashMap](HashMap) based memory storage.
123    Memory,
124}
125
126impl StorageOption {
127    /// Which storage option should be used by default.
128    pub fn default_priority() -> UniqueVec<Self> {
129        vec![
130            StorageOption::SerdeJson,
131            // TODO fix sled! This is currently not working on multiple threads
132            // StorageOptions::Sled,
133        ]
134        .into()
135    }
136}
137
138/// Define how elements and identifiers are saved when being serialized together.
139#[derive(Clone, Debug, Deserialize, Serialize)]
140pub struct CombinedSaveFormat<Id, Element> {
141    /// Identifier of the element
142    pub identifier: Id,
143    /// Actual element which is being stored
144    pub element: Element,
145}
146
147/// Define how batches of elements and identifiers are saved when being serialized.
148#[derive(Clone, Debug, Deserialize, Serialize)]
149pub struct BatchSaveFormat<Id, Element> {
150    pub(super) data: Vec<CombinedSaveFormat<Id, Element>>,
151}
152
153/// This manager handles if multiple storage options have been specified
154/// It can load resources from one storage aspect and will
155#[derive(Clone, Debug)]
156pub struct StorageManager<Id, Element> {
157    storage_priority: UniqueVec<StorageOption>,
158    builder: StorageBuilder<true>,
159    instance: u64,
160
161    sled_storage: Option<SledStorageInterface<Id, Element>>,
162    sled_temp_storage: Option<SledStorageInterface<Id, Element, true>>,
163    json_storage: Option<StorageWrapper<JsonStorageInterface<Id, Element>>>,
164    ron_storage: Option<StorageWrapper<RonStorageInterface<Id, Element>>>,
165    memory_storage: Option<MemoryStorageInterface<Id, Element>>,
166}
167
168/// Used to construct a [StorageManager]
169///
170/// This builder contains multiple options which can be used to configure the location and type in
171/// which results are stored.
172/// To get an overview over all possible options, we refer to the [module](crate::storage)
173/// documentation.
174///
175/// ```
176/// use cellular_raza_core::storage::{StorageBuilder, StorageOption};
177///
178/// let storage_priority = StorageOption::default_priority();
179/// let storage_builder = StorageBuilder::new()
180///     .priority(storage_priority)
181///     .location("./");
182/// ```
183#[derive(Clone, Debug, Deserialize, Serialize)]
184pub struct StorageBuilder<const INIT: bool = false> {
185    location: std::path::PathBuf,
186    priority: UniqueVec<StorageOption>,
187    suffix: std::path::PathBuf,
188    #[cfg(feature = "timestamp")]
189    add_date: bool,
190    #[cfg(feature = "timestamp")]
191    date: std::path::PathBuf,
192}
193
194impl<const INIT: bool> StorageBuilder<INIT> {
195    /// Define the priority of [StorageOption]. See [StorageOption::default_priority].
196    pub fn priority(self, priority: impl IntoIterator<Item = StorageOption>) -> Self {
197        let (priority, _) = UniqueVec::from_iter(priority);
198        Self { priority, ..self }
199    }
200
201    /// Get the current priority
202    pub fn get_priority(&self) -> UniqueVec<StorageOption> {
203        self.priority.clone()
204    }
205
206    /// Define a suffix which will be appended to the save path
207    pub fn suffix(self, suffix: impl Into<std::path::PathBuf>) -> Self {
208        Self {
209            suffix: suffix.into(),
210            ..self
211        }
212    }
213
214    /// Get the current suffix
215    pub fn get_suffix(&self) -> std::path::PathBuf {
216        self.suffix.clone()
217    }
218
219    /// Store results by their current date inside the specified folder path
220    #[cfg(feature = "timestamp")]
221    pub fn add_date(self, add_date: bool) -> Self {
222        Self { add_date, ..self }
223    }
224
225    /// Get information if the current date should be appended to the storage path
226    #[cfg(feature = "timestamp")]
227    pub fn get_add_date(&self) -> bool {
228        self.add_date
229    }
230}
231
232impl StorageBuilder<false> {
233    /// Constructs a new [StorageBuilder] with default settings.
234    ///
235    /// ```
236    /// use cellular_raza_core::storage::StorageBuilder;
237    /// let storage_builder = StorageBuilder::new();
238    /// ```
239    #[cfg_attr(feature = "tracing", instrument(skip_all))]
240    pub fn new() -> Self {
241        Self {
242            location: "./out".into(),
243            priority: UniqueVec::from_iter([StorageOption::SerdeJson]).0,
244            suffix: "".into(),
245            #[cfg(feature = "timestamp")]
246            add_date: true,
247            #[cfg(feature = "timestamp")]
248            date: "".into(),
249        }
250    }
251
252    /// Initializes the [StorageBuilder] thus filling information about time.
253    #[cfg_attr(feature = "tracing", instrument(skip_all))]
254    pub fn init(self) -> StorageBuilder<true> {
255        #[cfg(feature = "timestamp")]
256        let date: std::path::PathBuf = if self.add_date {
257            format!("{}", chrono::Local::now().format("%Y-%m-%d-T%H-%M-%S")).into()
258        } else {
259            "".into()
260        };
261        self.init_with_date(&date)
262    }
263
264    /// Specify the time at which the results should be saved
265    #[cfg_attr(feature = "tracing", instrument(skip_all))]
266    pub fn init_with_date(self, date: &std::path::Path) -> StorageBuilder<true> {
267        StorageBuilder::<true> {
268            location: self.location,
269            priority: self.priority,
270            suffix: self.suffix,
271            #[cfg(feature = "timestamp")]
272            add_date: self.add_date,
273            #[cfg(feature = "timestamp")]
274            date: date.into(),
275        }
276    }
277
278    /// Define a folder where to store results
279    ///
280    /// Note that this functionality is only available as long as the [StorageBuilder] has not been
281    /// initialized.
282    pub fn location<P>(self, location: P) -> Self
283    where
284        std::path::PathBuf: From<P>,
285    {
286        Self {
287            location: location.into(),
288            ..self
289        }
290    }
291
292    /// Get the current storage_location
293    ///
294    /// Note that this functionality is only available as long as the [StorageBuilder] has not been
295    /// initialized.
296    pub fn get_location(&self) -> std::path::PathBuf {
297        self.location.clone()
298    }
299}
300
301impl StorageBuilder<true> {
302    /// Get the fully constructed path after the Builder has been initialized with the
303    /// [StorageBuilder::init] function.
304    #[cfg_attr(feature = "tracing", instrument(skip_all))]
305    pub fn get_full_path(&self) -> std::path::PathBuf {
306        let mut full_path = self.location.clone();
307        #[cfg(feature = "timestamp")]
308        if self.add_date {
309            full_path.extend(&self.date);
310        }
311        full_path.extend(&self.suffix);
312        full_path
313    }
314
315    #[doc(hidden)]
316    pub fn init(self) -> Self {
317        self
318    }
319
320    /// De-initializes the StorageBuilder, making it possible to edit it again.
321    pub fn de_init(self) -> StorageBuilder<false> {
322        StorageBuilder {
323            location: self.location,
324            priority: self.priority,
325            suffix: self.suffix,
326            #[cfg(feature = "timestamp")]
327            add_date: self.add_date,
328            #[cfg(feature = "timestamp")]
329            date: "".into(),
330        }
331    }
332}
333
334impl<Id, Element> StorageManager<Id, Element> {
335    /// Constructs the [StorageManager] from the instance identifier
336    /// and the settings given by the [StorageBuilder].
337    ///
338    /// ```
339    /// use cellular_raza_core::storage::*;
340    /// let builder = StorageBuilder::new()
341    ///     .location("/tmp")
342    ///     .init();
343    ///
344    /// let manager = StorageManager::<usize, f64>::open_or_create(builder, 0)?;
345    /// # Ok::<(), StorageError>(())
346    /// ```
347    #[cfg_attr(feature = "tracing", instrument(skip_all))]
348    pub fn open_or_create(
349        storage_builder: StorageBuilder<true>,
350        instance: u64,
351    ) -> Result<Self, StorageError> {
352        let location = storage_builder.get_full_path();
353
354        let mut sled_storage = None;
355        let mut sled_temp_storage = None;
356        let mut json_storage = None;
357        let mut ron_storage = None;
358        let mut memory_storage = None;
359        for storage_variant in storage_builder.priority.iter() {
360            match storage_variant {
361                StorageOption::SerdeJson => {
362                    json_storage = Some(StorageWrapper(
363                        JsonStorageInterface::<Id, Element>::open_or_create(
364                            &location
365                                .to_path_buf()
366                                .join(JsonStorageInterface::<Id, Element>::EXTENSION),
367                            instance,
368                        )?,
369                    ));
370                }
371                StorageOption::Sled => {
372                    sled_storage =
373                        Some(SledStorageInterface::<Id, Element, false>::open_or_create(
374                            &location.to_path_buf().join("sled"),
375                            instance,
376                        )?);
377                }
378                StorageOption::SledTemp => {
379                    sled_temp_storage =
380                        Some(SledStorageInterface::<Id, Element, true>::open_or_create(
381                            &location.to_path_buf().join("sled_memory"),
382                            instance,
383                        )?);
384                }
385                StorageOption::Ron => {
386                    ron_storage = Some(StorageWrapper(
387                        RonStorageInterface::<Id, Element>::open_or_create(
388                            &location
389                                .to_path_buf()
390                                .join(RonStorageInterface::<Id, Element>::EXTENSION),
391                            instance,
392                        )?,
393                    ));
394                }
395                StorageOption::Memory => {
396                    memory_storage = Some(MemoryStorageInterface::<Id, Element>::open_or_create(
397                        &location.to_path_buf(),
398                        instance,
399                    )?);
400                }
401            }
402        }
403        let manager = StorageManager {
404            storage_priority: storage_builder.priority.clone(),
405            builder: storage_builder.clone(),
406            instance,
407
408            sled_storage,
409            sled_temp_storage,
410            json_storage,
411            ron_storage,
412            memory_storage,
413        };
414
415        Ok(manager)
416    }
417
418    /// Extracts all information given by the [StorageBuilder] when constructing
419    #[cfg_attr(feature = "tracing", instrument(skip_all))]
420    pub fn extract_builder(&self) -> StorageBuilder<true> {
421        self.builder.clone()
422    }
423
424    /// Get the instance of this object.
425    ///
426    /// These instances should not be overlapping, ie. there should not be two objects existing in
427    /// parallel with the same instance number.
428    pub fn get_instance(&self) -> u64 {
429        self.instance
430    }
431}
432
433macro_rules! exec_for_all_storage_options(
434    (@internal $self:ident, $storage_option:ident, $field:ident, $function:ident, $($args:tt)*) => {
435        {
436            if let Some($field) = &$self.$field {
437                $field.$function($($args)*)
438            } else {
439                Err(StorageError::InitError(
440                    stringify!($storage_option, " storage was not initialized but called").into(),
441                ))?
442            }
443        }
444    };
445    (mut $self:ident, $field:ident, $function:ident, $($args:tt)*) => {
446        if let Some($field) = &mut $self.$field {
447            $field.$function($($args)*)?;
448        }
449    };
450    (all mut $self:ident, $function:ident, $($args:tt)*) => {
451        exec_for_all_storage_options!(mut $self, sled_storage, $function, $($args)*);
452        exec_for_all_storage_options!(mut $self, sled_temp_storage, $function, $($args)*);
453        exec_for_all_storage_options!(mut $self, json_storage, $function, $($args)*);
454        exec_for_all_storage_options!(mut $self, ron_storage, $function, $($args)*);
455        exec_for_all_storage_options!(mut $self, memory_storage, $function, $($args)*);
456    };
457    ($self:ident, $priority:ident, $function:ident, $($args:tt)*) => {
458        match $priority {
459            StorageOption::Sled => exec_for_all_storage_options!(
460                @internal $self, Sled, sled_storage, $function, $($args)*
461            ),
462            StorageOption::SledTemp => exec_for_all_storage_options!(
463                @internal $self, SledTemp, sled_temp_storage, $function, $($args)*
464            ),
465            StorageOption::SerdeJson => exec_for_all_storage_options!(
466                @internal $self, SerdeJson, json_storage, $function, $($args)*
467            ),
468            StorageOption::Ron => exec_for_all_storage_options!(
469                @internal $self, Ron, ron_storage, $function, $($args)*
470            ),
471            StorageOption::Memory => exec_for_all_storage_options!(
472                @internal $self, Memory, memory_storage, $function, $($args)*
473            ),
474        }
475    }
476);
477
478impl<Id, Element> StorageInterfaceStore<Id, Element> for StorageManager<Id, Element>
479where
480    Id: core::hash::Hash + core::cmp::Eq + Clone,
481    Element: Clone,
482{
483    #[allow(unused)]
484    fn store_single_element(
485        &mut self,
486        iteration: u64,
487        identifier: &Id,
488        element: &Element,
489    ) -> Result<(), StorageError>
490    where
491        Id: Serialize,
492        Element: Serialize,
493    {
494        exec_for_all_storage_options!(
495            all mut self,
496            store_single_element,
497            iteration, identifier, element
498        );
499        Ok(())
500    }
501
502    #[allow(unused)]
503    fn store_batch_elements<'a, I>(
504        &'a mut self,
505        iteration: u64,
506        identifiers_elements: I,
507    ) -> Result<(), StorageError>
508    where
509        Id: 'a + Serialize,
510        Element: 'a + Serialize,
511        I: Clone + IntoIterator<Item = (&'a Id, &'a Element)>,
512    {
513        exec_for_all_storage_options!(
514            all mut self,
515            store_batch_elements,
516            iteration,
517            identifiers_elements.clone()
518        );
519        Ok(())
520    }
521}
522
523impl<Id, Element> StorageInterfaceLoad<Id, Element> for StorageManager<Id, Element>
524where
525    Id: core::hash::Hash + core::cmp::Eq + Clone,
526    Element: Clone,
527{
528    #[allow(unused)]
529    fn load_single_element(
530        &self,
531        iteration: u64,
532        identifier: &Id,
533    ) -> Result<Option<Element>, StorageError>
534    where
535        Id: Serialize + for<'a> Deserialize<'a>,
536        Element: for<'a> Deserialize<'a>,
537    {
538        for priority in self.storage_priority.iter() {
539            return exec_for_all_storage_options!(
540                self,
541                priority,
542                load_single_element,
543                iteration,
544                identifier
545            );
546        }
547        Ok(None)
548    }
549
550    #[allow(unused)]
551    fn load_all_elements_at_iteration(
552        &self,
553        iteration: u64,
554    ) -> Result<HashMap<Id, Element>, StorageError>
555    where
556        Id: std::hash::Hash + std::cmp::Eq + for<'a> Deserialize<'a>,
557        Element: for<'a> Deserialize<'a>,
558    {
559        for priority in self.storage_priority.iter() {
560            return exec_for_all_storage_options!(
561                self,
562                priority,
563                load_all_elements_at_iteration,
564                iteration
565            );
566        }
567        Ok(HashMap::new())
568    }
569
570    fn get_all_iterations(&self) -> Result<Vec<u64>, StorageError> {
571        for priority in self.storage_priority.iter() {
572            return exec_for_all_storage_options!(self, priority, get_all_iterations,);
573        }
574        Ok(Vec::new())
575    }
576}
577
578/// The mode in which to generate paths and store results.
579pub enum StorageMode {
580    /// Save one element to a single file
581    Single,
582    /// Save many elements in one file.
583    Batch,
584}
585
586impl StorageMode {
587    fn to_str(&self) -> &str {
588        match self {
589            Self::Single => "single",
590            Self::Batch => "batch",
591        }
592    }
593}
594
595/// Abstraction and simplification of many file-based storage solutions
596pub trait FileBasedStorage<Id, Element> {
597    /// The suffix which is used to distinguish this storage solution from others.
598    const EXTENSION: &'static str;
599
600    /// Get path where results are stored.
601    fn get_path(&self) -> &std::path::Path;
602
603    /// Get the number of this storage instance.
604    /// This value may coincide with the thread number.
605    fn get_storage_instance(&self) -> u64;
606
607    /// Writes either [BatchSaveFormat] or [CombinedSaveFormat] to the disk.
608    fn to_writer_pretty<V, W>(&self, writer: W, value: &V) -> Result<(), StorageError>
609    where
610        V: Serialize,
611        W: std::io::Write;
612
613    /// Deserializes the given value type from a reader.
614    fn from_reader<V, R>(&self, reader: R) -> Result<V, StorageError>
615    where
616        V: for<'a> Deserialize<'a>,
617        R: std::io::Read;
618
619    /// Creates a new iteration file with a predefined naming scheme.
620    ///
621    /// The path which to use is by default determined by the
622    /// [FileBasedStorage::get_iteration_save_path_batch_with_prefix] function.
623    fn create_or_get_iteration_file_with_prefix(
624        &self,
625        iteration: u64,
626        mode: StorageMode,
627    ) -> Result<std::io::BufWriter<std::fs::File>, StorageError> {
628        let save_path = self.get_iteration_save_path_batch_with_prefix(iteration, mode)?;
629
630        // Open+Create a file and wrap it inside a buffer writer
631        let file = std::fs::OpenOptions::new()
632            .read(true)
633            .write(true)
634            .create(true)
635            .open(&save_path)?;
636
637        Ok(std::io::BufWriter::new(file))
638    }
639
640    /// Get the path which holds saved entries if the given iteration.
641    ///
642    /// By default this function joins the path generated by [FileBasedStorage::get_path]
643    /// with a 0-delimited number according to the iteration number.
644    fn get_iteration_path(&self, iteration: u64) -> std::path::PathBuf {
645        self.get_path().join(format!("{:020.0}", iteration))
646    }
647
648    /// Creates the path used by the [FileBasedStorage::create_or_get_iteration_file_with_prefix]
649    /// function.
650    fn get_iteration_save_path_batch_with_prefix(
651        &self,
652        iteration: u64,
653        mode: StorageMode,
654    ) -> Result<std::path::PathBuf, StorageError> {
655        // First we get the folder path of the iteration
656        let iteration_path = self.get_iteration_path(iteration);
657        // If this folder does not exist, we create it
658        std::fs::create_dir_all(&iteration_path)?;
659
660        // Check if other batch files are already existing
661        // If this is the case increase the batch number until we find one where no batch is existing
662        let prefix = mode.to_str();
663        let create_save_path = |i: usize| -> std::path::PathBuf {
664            iteration_path
665                .join(format!(
666                    "{}_{:020.0}_{:020.0}",
667                    prefix,
668                    self.get_storage_instance(),
669                    i
670                ))
671                .with_extension(Self::EXTENSION)
672        };
673        let mut counter = 0;
674        let mut save_path;
675        while {
676            save_path = create_save_path(counter);
677            save_path.exists()
678        } {
679            counter += 1
680        }
681        Ok(save_path)
682    }
683
684    /// Converts a given path of a folder to a iteration number.
685    ///
686    /// This function is used for loading results
687    fn folder_name_to_iteration(
688        &self,
689        file: &std::path::Path,
690    ) -> Result<Option<u64>, StorageError> {
691        match file.file_stem() {
692            Some(filename) => match filename.to_str() {
693                Some(filename_string) => Ok(Some(filename_string.parse::<u64>()?)),
694                None => Ok(None),
695            },
696            None => Ok(None),
697        }
698    }
699}
700
701#[derive(Clone, Debug, Deserialize, Serialize)]
702pub(crate) struct StorageWrapper<T>(pub(crate) T);
703
704impl<T, Id, Element> StorageInterfaceStore<Id, Element> for StorageWrapper<T>
705where
706    T: FileBasedStorage<Id, Element>,
707{
708    fn store_batch_elements<'a, I>(
709        &'a mut self,
710        iteration: u64,
711        identifiers_elements: I,
712    ) -> Result<(), StorageError>
713    where
714        Id: 'a + Serialize,
715        Element: 'a + Serialize,
716        I: Clone + IntoIterator<Item = (&'a Id, &'a Element)>,
717    {
718        let iteration_file = self
719            .0
720            .create_or_get_iteration_file_with_prefix(iteration, StorageMode::Batch)?;
721        let batch = BatchSaveFormat {
722            data: identifiers_elements
723                .into_iter()
724                .map(|(id, element)| CombinedSaveFormat {
725                    identifier: id,
726                    element,
727                })
728                .collect(),
729        };
730        self.0.to_writer_pretty(iteration_file, &batch)?;
731        Ok(())
732    }
733
734    fn store_single_element(
735        &mut self,
736        iteration: u64,
737        identifier: &Id,
738        element: &Element,
739    ) -> Result<(), StorageError>
740    where
741        Id: Serialize,
742        Element: Serialize,
743    {
744        let iteration_file = self
745            .0
746            .create_or_get_iteration_file_with_prefix(iteration, StorageMode::Single)?;
747        let save_format = CombinedSaveFormat {
748            identifier,
749            element,
750        };
751        self.0.to_writer_pretty(iteration_file, &save_format)?;
752        Ok(())
753    }
754}
755
756/// Open or create a new instance of the Storage controller.
757pub trait StorageInterfaceOpen {
758    /// Initializes the current storage device.
759    ///
760    /// In the case of databases, this may already result in an IO operation
761    /// while when saving as files such as json folders might be created.
762    fn open_or_create(
763        location: &std::path::Path,
764        storage_instance: u64,
765    ) -> Result<Self, StorageError>
766    where
767        Self: Sized;
768}
769
770/// Handles storing of elements
771pub trait StorageInterfaceStore<Id, Element> {
772    /// Saves a single element at given iteration.
773    fn store_single_element(
774        &mut self,
775        iteration: u64,
776        identifier: &Id,
777        element: &Element,
778    ) -> Result<(), StorageError>
779    where
780        Id: Serialize,
781        Element: Serialize;
782
783    /// Stores a batch of multiple elements with identifiers all at the same iteration.
784    fn store_batch_elements<'a, I>(
785        &'a mut self,
786        iteration: u64,
787        identifiers_elements: I,
788    ) -> Result<(), StorageError>
789    where
790        Id: 'a + Serialize,
791        Element: 'a + Serialize,
792        I: Clone + IntoIterator<Item = (&'a Id, &'a Element)>;
793}
794
795/// Handles loading of elements
796pub trait StorageInterfaceLoad<Id, Element> {
797    // TODO decide if these functions should be &mut self instead of &self
798    // This could be useful when implementing buffers, but maybe unnecessary.
799    /// Loads a single element from the storage solution if the element exists.
800    fn load_single_element(
801        &self,
802        iteration: u64,
803        identifier: &Id,
804    ) -> Result<Option<Element>, StorageError>
805    where
806        Id: Eq + Serialize + for<'a> Deserialize<'a>,
807        Element: for<'a> Deserialize<'a>;
808
809    /// Loads the elements history, meaning every occurrence of the element in the storage.
810    /// This function by default provides the results in ordered fashion such that the time
811    /// direction is retained.
812    /// Furthermore this function assumes that a given index occurs over the course of a complete
813    /// time segment with no interceptions.
814    /// ```
815    /// // All elements (given by Strings) occur over a period of time
816    /// // but do not appear afterwards.
817    /// use std::collections::HashMap;
818    /// let valid_state = HashMap::from([
819    ///     (0, vec!["E1", "E2", "E3"]),
820    ///     (1, vec!["E1", "E2", "E3", "E4"]),
821    ///     (2, vec!["E1", "E2", "E3", "E4"]),
822    ///     (3, vec!["E1", "E2", "E4"]),
823    ///     (4, vec!["E2", "E4"]),
824    ///     (5, vec!["E2", "E4", "E5"]),
825    ///     (6, vec!["E4", "E5"]),
826    /// ]);
827    /// // The entry "E2" is missing in iteration 1 but present afterwards.
828    /// // This is an invalid state but will not be caught.
829    /// // The backend is responsible to avoid this state.
830    /// let invalid_state = HashMap::from([
831    ///     (0, vec!["E1", "E2"]),
832    ///     (1, vec!["E1"]),
833    ///     (2, vec!["E1", "E2"]),
834    /// ]);
835    /// ```
836    fn load_element_history(&self, identifier: &Id) -> Result<HashMap<u64, Element>, StorageError>
837    where
838        Id: Eq + Serialize + for<'a> Deserialize<'a>,
839        Element: for<'a> Deserialize<'a>,
840    {
841        let mut iterations = self.get_all_iterations()?;
842        iterations.sort();
843        let mut started_gathering = false;
844        let mut stop_gathering = false;
845        let results = iterations
846            .iter()
847            .filter_map(|&iteration| {
848                if stop_gathering {
849                    None
850                } else {
851                    match self.load_single_element(iteration, identifier) {
852                        Ok(Some(element)) => {
853                            started_gathering = true;
854                            Some(Ok((iteration, element)))
855                        }
856                        Ok(None) => {
857                            if started_gathering {
858                                stop_gathering = true;
859                            }
860                            None
861                        }
862                        Err(e) => Some(Err(e)),
863                    }
864                }
865            })
866            .collect::<Result<HashMap<u64, _>, StorageError>>()?;
867        Ok(results)
868    }
869
870    /// Gets a snapshot of all elements at a given iteration.
871    ///
872    /// This function might be useful when implementing how simulations can be restored from saved
873    /// results.
874    fn load_all_elements_at_iteration(
875        &self,
876        iteration: u64,
877    ) -> Result<HashMap<Id, Element>, StorageError>
878    where
879        Id: std::hash::Hash + std::cmp::Eq + for<'a> Deserialize<'a>,
880        Element: for<'a> Deserialize<'a>;
881
882    /// Get all iteration values which have been saved.
883    fn get_all_iterations(&self) -> Result<Vec<u64>, StorageError>;
884
885    /// Loads all elements for every iteration.
886    /// This will yield the complete storage and may result in extremely large allocations of
887    /// memory.
888    fn load_all_elements(&self) -> Result<BTreeMap<u64, HashMap<Id, Element>>, StorageError>
889    where
890        Id: std::hash::Hash + std::cmp::Eq + for<'a> Deserialize<'a>,
891        Element: for<'a> Deserialize<'a>,
892    {
893        let iterations = self.get_all_iterations()?;
894        let all_elements = iterations
895            .iter()
896            .map(|iteration| {
897                let elements = self.load_all_elements_at_iteration(*iteration)?;
898                Ok((*iteration, elements))
899            })
900            .collect::<Result<BTreeMap<_, _>, StorageError>>()?;
901        Ok(all_elements)
902    }
903
904    /// Similarly to the [load_all_elements](StorageInterfaceLoad::load_all_elements) function,
905    /// but this function returns all elements as their histories.
906    fn load_all_element_histories(
907        &self,
908    ) -> Result<HashMap<Id, BTreeMap<u64, Element>>, StorageError>
909    where
910        Id: std::hash::Hash + std::cmp::Eq + for<'a> Deserialize<'a>,
911        Element: for<'a> Deserialize<'a>,
912    {
913        let all_elements = self.load_all_elements()?;
914        let reordered_elements: HashMap<Id, BTreeMap<u64, Element>> = all_elements
915            .into_iter()
916            .map(|(iteration, identifier_to_elements)| {
917                identifier_to_elements
918                    .into_iter()
919                    .map(move |(identifier, element)| (identifier, iteration, element))
920            })
921            .flatten()
922            .fold(
923                HashMap::new(),
924                |mut acc, (identifier, iteration, element)| {
925                    let existing_elements = acc.entry(identifier).or_default();
926                    existing_elements.insert(iteration, element);
927                    acc
928                },
929            );
930        Ok(reordered_elements)
931    }
932}
933
934impl<T, Id, Element> StorageInterfaceLoad<Id, Element> for StorageWrapper<T>
935where
936    T: FileBasedStorage<Id, Element>,
937{
938    fn load_single_element(
939        &self,
940        iteration: u64,
941        identifier: &Id,
942    ) -> Result<Option<Element>, StorageError>
943    where
944        Id: Eq + Serialize + for<'a> Deserialize<'a>,
945        Element: for<'a> Deserialize<'a>,
946    {
947        let iterations = self.get_all_iterations()?;
948        if iterations.contains(&iteration) {
949            // Get the path where the iteration folder is
950            let iteration_path = self.0.get_iteration_path(iteration);
951
952            // Load all elements which are inside this folder from batches and singles
953            for path in std::fs::read_dir(&iteration_path)? {
954                let p = path?.path();
955                let file = std::fs::OpenOptions::new().read(true).open(&p)?;
956
957                match p.file_stem() {
958                    Some(stem) => match stem.to_str() {
959                        Some(tail) => {
960                            let first_name_segment = tail.split("_").into_iter().next();
961                            if first_name_segment == Some("batch") {
962                                let result: BatchSaveFormat<Id, Element> =
963                                    self.0.from_reader(file)?;
964                                for json_save_format in result.data.into_iter() {
965                                    if &json_save_format.identifier == identifier {
966                                        return Ok(Some(json_save_format.element));
967                                    }
968                                }
969                            } else if first_name_segment == Some("single") {
970                                let result: CombinedSaveFormat<Id, Element> =
971                                    self.0.from_reader(file)?;
972                                if &result.identifier == identifier {
973                                    return Ok(Some(result.element));
974                                }
975                            }
976                        }
977                        None => (),
978                    },
979                    None => (),
980                }
981            }
982            return Ok(None);
983        } else {
984            return Ok(None);
985        }
986    }
987
988    fn load_all_elements_at_iteration(
989        &self,
990        iteration: u64,
991    ) -> Result<HashMap<Id, Element>, StorageError>
992    where
993        Id: std::hash::Hash + std::cmp::Eq + for<'a> Deserialize<'a>,
994        Element: for<'a> Deserialize<'a>,
995    {
996        let iterations = self.get_all_iterations()?;
997        if iterations.contains(&iteration) {
998            // Create a new empty hashmap
999            let mut all_elements_at_iteration = HashMap::new();
1000
1001            // Get the path where the iteration folder is
1002            let iteration_path = self.0.get_iteration_path(iteration);
1003
1004            // Load all elements which are inside this folder from batches and singles
1005            for path in std::fs::read_dir(&iteration_path)? {
1006                let p = path?.path();
1007                let file = std::fs::OpenOptions::new().read(true).open(&p)?;
1008
1009                match p.file_stem() {
1010                    Some(stem) => match stem.to_str() {
1011                        Some(tail) => {
1012                            let first_name_segment = tail.split("_").into_iter().next();
1013                            if first_name_segment == Some("batch") {
1014                                let result: BatchSaveFormat<Id, Element> =
1015                                    self.0.from_reader(file)?;
1016                                all_elements_at_iteration.extend(result.data.into_iter().map(
1017                                    |json_save_format| {
1018                                        (json_save_format.identifier, json_save_format.element)
1019                                    },
1020                                ));
1021                            } else if first_name_segment == Some("single") {
1022                                let result: CombinedSaveFormat<Id, Element> =
1023                                    self.0.from_reader(file)?;
1024                                all_elements_at_iteration
1025                                    .extend([(result.identifier, result.element)]);
1026                            }
1027                        }
1028                        None => (),
1029                    },
1030                    None => (),
1031                }
1032            }
1033            return Ok(all_elements_at_iteration);
1034        } else {
1035            return Ok(HashMap::new());
1036        }
1037    }
1038
1039    fn get_all_iterations(&self) -> Result<Vec<u64>, StorageError> {
1040        let paths = std::fs::read_dir(&self.0.get_path())?;
1041        paths
1042            .into_iter()
1043            .filter_map(|path| match path {
1044                Ok(p) => match self.0.folder_name_to_iteration(&p.path()) {
1045                    Ok(Some(entry)) => Some(Ok(entry)),
1046                    Ok(None) => None,
1047                    Err(e) => Some(Err(e)),
1048                },
1049                Err(_) => None,
1050            })
1051            .collect::<Result<Vec<_>, _>>()
1052    }
1053}
1054
1055/// Provide methods to initialize, store and load single and multiple elements at iterations.
1056pub trait StorageInterface<Id, Element>:
1057    StorageInterfaceOpen + StorageInterfaceLoad<Id, Element> + StorageInterfaceStore<Id, Element>
1058{
1059}
1060
1061impl<Id, Element, T> StorageInterface<Id, Element> for T
1062where
1063    T: StorageInterfaceLoad<Id, Element>,
1064    T: StorageInterfaceStore<Id, Element>,
1065    T: StorageInterfaceOpen,
1066{
1067}