config_it/config/
storage.rs

1//! The `storage` module provides a high-level interface to interact with configuration data,
2//! ensuring safety, flexibility, and efficiency.
3//!
4//! This module primarily revolves around the `Storage` struct, which serves as the main access
5//! point for users to interact with the underlying storage system. By abstracting intricate
6//! operations into straightforward methods, it simplifies user interaction with stored
7//! configuration data.
8//!
9//! Key features include:
10//! - **Data Retrieval and Creation**: Safely find or create items with `find_or_create`.
11//! - **Data Import/Export**: Handle complex serialization and deserialization logic seamlessly with
12//!   `import` and `exporter`.
13//! - **Monitoring**: Integrate external monitoring systems and receive updates about storage
14//!   modifications using the `replace_monitor`, `unset_monitor`, and `notify_editions` methods.
15//! - **Encryption Support**: Securely encrypt data (when the encryption feature is enabled) using
16//!   `set_encryption_key`.
17
18use std::{
19    any::{Any, TypeId},
20    mem::replace,
21    sync::{
22        atomic::{AtomicU64, Ordering},
23        Arc, Weak,
24    },
25};
26
27use strseq::SharedStringSequence;
28
29use crate::{
30    config::{entity, noti},
31    shared::{archive, GroupID, ItemID, PathHash},
32};
33
34use super::{
35    entity::EntityEventHook,
36    group::{self, GroupContext},
37};
38
39/* ---------------------------------------------------------------------------------------------- */
40/*                                      STORAGE MONITOR TRAIT                                     */
41/* ---------------------------------------------------------------------------------------------- */
42
43/// Monitors every storage actions. If monitor tracks every event of single storage, it can
44/// replicate internal state perfectly.
45///
46/// Monitor should properly handle property flags, such as `ADMIN`, `READONLY`, `SECRET`, etc ...
47///
48/// Blocking behavior may cause deadlock, thus monitor should be implemented as non-blocking
49/// manner. (e.g. forwarding events to channel)
50pub trait Monitor: Send + Sync + 'static {
51    /// Called when new group is added.
52    fn group_added(&mut self, group_id: GroupID, group: &Arc<GroupContext>) {
53        let _ = (group_id, group);
54    }
55
56    /// Can be call with falsy group_id if monitor is being replaced.
57    fn group_removed(&mut self, group_id: GroupID) {
58        let _ = group_id;
59    }
60
61    /// Called when any entity value is updated. Can be called with falsy group_id if monitor is
62    /// being replaced.
63    ///
64    /// Since this is called frequently compared to group modification commands, receives immutable
65    /// self reference. Therefore, all state modification should be handled with interior
66    /// mutability!
67    fn entity_value_updated(&self, group_id: GroupID, item_id: ItemID) {
68        let _ = (group_id, item_id);
69    }
70}
71
72/* ---------------------------------------------------------------------------------------------- */
73/*                                           STORAGE API                                          */
74/* ---------------------------------------------------------------------------------------------- */
75/// Provides a high-level, thread-safe interface to the configuration storage system.
76///
77/// `Storage` acts as the primary access point for users to interact with the underlying storage
78/// system. It abstracts away the intricacies of direct storage interactions by wrapping around the
79/// `inner::Inner` type, ensuring concurrent safety.
80///
81/// With `Storage`, users can seamlessly perform read and write operations on their configuration
82/// data without worrying about potential concurrency issues. This design ensures that the storage
83/// system remains robust and efficient, even in multi-threaded environments.
84///
85/// # Features:
86/// - **Thread Safety**: Guarantees safe concurrent access to the configuration storage.
87/// - **High-level Interface**: Abstracts the complexities of direct storage interactions, offering
88///   a user-friendly API.
89#[derive(Debug, Default, Clone)]
90pub struct Storage(Arc<inner::Inner>);
91
92#[derive(thiserror::Error, Debug)]
93pub enum GroupFindError {
94    #[error("Given path was not found")]
95    PathNotFound,
96    #[error("Type ID mismatch from original registration")]
97    MismatchedTypeID,
98}
99
100#[derive(thiserror::Error, Debug)]
101pub enum GroupFindOrCreateError {
102    #[error("Type ID mismatch from original registration")]
103    MismatchedTypeID,
104}
105
106#[derive(thiserror::Error, Debug)]
107pub enum GroupCreationError {
108    #[error("Path name duplicated, found early! Path was: {0:?}")]
109    PathCollisionEarly(SharedStringSequence),
110    #[error("Path name duplication found during registeration. Path was: {0:?}")]
111    PathCollisionRace(SharedStringSequence),
112}
113
114impl Storage {
115    /// Gets ID of this storage instance. ID is unique per single program instance.
116    pub fn storage_id(&self) -> crate::shared::StorageID {
117        self.0.id
118    }
119
120    /// Searches for an existing item of type `T` in the storage, or creates a new one if it doesn't
121    /// exist.
122    ///
123    /// # Arguments
124    ///
125    /// * `T` - The type of item to search for or create.
126    ///
127    /// # Returns
128    ///
129    /// A reference to the existing or newly created item of type `T`. Value remains in template
130    /// default until you call first `update()` on it.
131    pub fn find_or_create<'a, T>(
132        &self,
133        path: impl IntoIterator<Item = impl AsRef<str> + 'a>,
134    ) -> Result<group::Group<T>, GroupFindOrCreateError>
135    where
136        T: group::Template,
137    {
138        let keys = SharedStringSequence::from_iter(path);
139        let path_hash = PathHash::new(keys.iter());
140
141        use GroupCreationError as GCE;
142        use GroupFindError as GFE;
143        use GroupFindOrCreateError as GFOE;
144
145        loop {
146            match self.find(path_hash) {
147                Ok(found) => break Ok(found),
148                Err(GFE::MismatchedTypeID) => break Err(GFOE::MismatchedTypeID),
149                Err(GFE::PathNotFound) => {}
150            }
151
152            match self.create_impl::<T>(keys.clone()) {
153                Ok(created) => break Ok(created),
154
155                // Simply retry on path collision
156                Err(GCE::PathCollisionEarly(_) | GCE::PathCollisionRace(_)) => continue,
157            }
158        }
159    }
160
161    /// Find a group with the given path and template type.
162    ///
163    /// # Arguments
164    ///
165    /// * `path` - The path of the group to find.
166    ///
167    /// # Type Parameters
168    ///
169    /// * `T` - The type of the template to use for the group.
170    ///
171    /// # Returns
172    ///
173    /// Returns a `Result` containing the found group or a `GroupFindError` if the group was not
174    /// found or if the template type does not match the expected type. Value remains in template
175    /// default until you call first `update()` on it.
176    pub fn find<T: group::Template>(
177        &self,
178        path: impl Into<PathHash>,
179    ) -> Result<group::Group<T>, GroupFindError> {
180        let path_hash = path.into();
181
182        if let Some(group) = self.0.find_group(&path_hash) {
183            if group.template_type_id != std::any::TypeId::of::<T>() {
184                Err(GroupFindError::MismatchedTypeID)
185            } else if let Some(anchor) = group.w_unregister_hook.upgrade() {
186                Ok(group::Group::create_with__(group, anchor))
187            } else {
188                // This is corner case where group was disposed during `find_group` is invoked.
189                Err(GroupFindError::PathNotFound)
190            }
191        } else {
192            Err(GroupFindError::PathNotFound)
193        }
194    }
195
196    /// Creates a new instance of the `Storage` struct with the specified type parameter. Value
197    /// remains in template default until you call first `update()` on it.
198    pub fn create<'a, T>(
199        &self,
200        path: impl IntoIterator<Item = &'a (impl AsRef<str> + ?Sized + 'a)>,
201    ) -> Result<group::Group<T>, GroupCreationError>
202    where
203        T: group::Template,
204    {
205        self.create_impl(path.into_iter().collect())
206    }
207
208    fn create_impl<T: group::Template>(
209        &self,
210        path: SharedStringSequence,
211    ) -> Result<group::Group<T>, GroupCreationError> {
212        assert!(!path.is_empty());
213        assert!(path.iter().all(|x| !x.is_empty()));
214
215        let path_hash = PathHash::new(path.iter());
216
217        // Naively check if there's already existing group with same path.
218        if self.0.find_group(&path_hash).is_some() {
219            return Err(GroupCreationError::PathCollisionEarly(path));
220        }
221
222        // This ID may not be used if group creation failed ... it's generally okay since we have
223        // 2^63 trials.
224        let register_id = GroupID::new_unique_incremental();
225        let entity_hook = Arc::new(EntityHookImpl { register_id, inner: Arc::downgrade(&self.0) });
226
227        debug_assert!(
228            T::props__().windows(2).all(|x| x[0].index + 1 == x[1].index),
229            "Something wrong with property generation"
230        );
231
232        let sources: Vec<_> = T::props__()
233            .iter()
234            .map(|prop| entity::EntityData::new(prop, entity_hook.clone()))
235            .collect();
236
237        // Drops the group when the final group instance is dropped.
238        let unregister_anchor = Arc::new(GroupUnregisterHook {
239            register_id,
240            path_hash,
241            inner: Arc::downgrade(&self.0),
242        });
243
244        // Create core config set context with reflected target metadata set
245        let tx_noti = noti::Sender::new();
246        let context = Arc::new(GroupContext {
247            group_id: register_id,
248            template_type_id: TypeId::of::<T>(),
249            template_name: T::template_name(),
250            w_unregister_hook: Arc::downgrade(
251                &(unregister_anchor.clone() as Arc<dyn Any + Send + Sync>),
252            ),
253            sources: sources.into(),
254            version: AtomicU64::new(1), // NOTE: This will trigger initial check_update() always.
255            update_receiver_channel: tx_noti.receiver(true),
256            path: path.clone(),
257        });
258
259        self.0
260            .register_group(path_hash, context, tx_noti)
261            .map(|context| group::Group::create_with__(context, unregister_anchor))
262    }
263
264    /// Create internal archive export task.
265    ///
266    /// You should explicitly call `confirm()` to retrieve the exported archive explcitly.
267    pub fn exporter(&self) -> inner::ExportTask {
268        inner::ExportTask::new(&self.0)
269    }
270
271    /// Deserializes the data.
272    ///
273    /// # Data Serialization Rules:
274    /// - The root component is the first path component and is written as-is.
275    /// - Subsequent path components must be prefixed with a `~` (tilde) character.
276    ///   - If not prefixed, they are treated as a field element of the enclosing path component.
277    /// - A key prefixed with '~' within an existing field is ...
278    ///   (Note: The comment here seems to be incomplete; please provide further details.)
279    ///
280    /// # Example JSON structure:
281    /// ```json
282    /// {
283    ///     "root_path": {
284    ///         "~path_component": {
285    ///             "field_name": "value",
286    ///             "other_field": {
287    ///                 "~this_is_not_treated_as_path": 123
288    ///             }
289    ///         },
290    ///         "~another_path_component": {},
291    ///         "field_name_of_root_path": "yay"
292    ///     },
293    ///     "another_root_path": {}
294    /// }
295    /// ```
296    ///
297    /// # Returns
298    ///
299    /// An instance of `ImportOnDrop` which will handle the import operation.
300    pub fn import(&self, archive: archive::Archive) -> inner::ImportOnDrop {
301        inner::ImportOnDrop::new(&self.0, archive)
302    }
303
304    /// Replaces the current monitor with the provided one.
305    ///
306    /// This function dumps the active list of groups to the new monitor sequentially. If the
307    /// monitor is not efficiently implemented, this operation can significantly impact the
308    /// performance of all storage consumers and replicators. Therefore, exercise caution when
309    /// replacing the monitor on a storage instance that's under heavy use.
310    ///
311    /// # Arguments
312    ///
313    /// * `handler` - The new monitor to replace the current one.
314    ///
315    /// # Returns
316    ///
317    /// The previous monitor that has been replaced.
318    pub fn replace_monitor(&self, handler: Box<impl Monitor>) -> Box<dyn Monitor> {
319        self.0.replace_monitor(handler)
320    }
321
322    /// Unset monitor instance.
323    pub fn unset_monitor(&self) {
324        *self.0.monitor.write() = Box::new(inner::EmptyMonitor);
325    }
326
327    /// Send monitor event to storage driver.
328    pub fn notify_editions(&self, items: impl IntoIterator<Item = GroupID>) {
329        for group in items {
330            self.0.notify_edition(group);
331        }
332    }
333
334    /// Sets the encryption key.
335    ///
336    /// If the encryption key is not provided before the first encryption or decryption operation, it will be automatically generated based on the machine's unique identifier (UID). If machine-UID generation is not supported, a predefined, hard-coded sequence will be used as the key.
337    ///
338    /// # Arguments
339    ///
340    /// * `key` - Byte slice representing the encryption key.
341    #[cfg(feature = "crypt")]
342    pub fn set_crypt_key(&self, key: impl AsRef<[u8]>) {
343        use sha2::{Digest, Sha256};
344        let arr = &*Sha256::new().chain_update(key).finalize();
345        self.0.crypt_key.write().replace(std::array::from_fn(|index| arr[index]));
346    }
347}
348
349/* ---------------------------------------------------------------------------------------------- */
350/*                                            INTERNALS                                           */
351/* ---------------------------------------------------------------------------------------------- */
352
353struct GroupUnregisterHook {
354    register_id: GroupID,
355    path_hash: PathHash,
356    inner: Weak<inner::Inner>,
357}
358
359impl Drop for GroupUnregisterHook {
360    fn drop(&mut self) {
361        // Just ignore result. If channel was closed before the set is unregistered,
362        //  it's ok to ignore this operation silently.
363        let Some(inner) = self.inner.upgrade() else { return };
364        inner.unregister_group(self.register_id, self.path_hash);
365    }
366}
367
368struct EntityHookImpl {
369    register_id: GroupID,
370    inner: Weak<inner::Inner>,
371}
372
373impl EntityEventHook for EntityHookImpl {
374    fn on_value_changed(&self, data: &entity::EntityData, silent: bool) {
375        // Update notification is transient, thus when storage driver is busy, it can
376        //  just be dropped.
377        let Some(inner) = self.inner.upgrade() else { return };
378        inner.on_value_update(self.register_id, data, silent);
379    }
380}
381
382mod inner {
383    use std::{collections::HashMap, mem::ManuallyDrop};
384
385    use derive_setters::Setters;
386    use parking_lot::RwLock;
387
388    use crate::{
389        config::entity::Entity,
390        shared::{archive::Archive, meta::MetaFlag, StorageID},
391    };
392
393    use super::*;
394
395    /// Manages and drives internal storage events.
396    ///
397    /// Primarily responsible for handling update requests and orchestrating
398    /// the underlying storage mechanisms.
399    #[derive(cs::Debug)]
400    pub(super) struct Inner {
401        /// Unique(during runtime) identifier for this storage.
402        pub id: StorageID,
403
404        /// Maintains a registry of all configuration sets within this storage.
405        ///
406        /// The key is the group's unique identifier, `GroupID`.
407        all_groups: RwLock<HashMap<GroupID, GroupRegistration>>,
408
409        /// Maintains a list of all monitors registered to this storage.
410        ///
411        /// Upon each monitoring event, the storage driver iterates over each session channel to
412        /// attempt replication. This ensures that all components are kept in sync with storage
413        /// changes.
414        #[debug(with = "fmt_monitor")]
415        pub monitor: RwLock<Box<dyn Monitor>>,
416
417        /// Keeps track of registered path hashes to quickly identify potential path name
418        /// duplications.
419        ///
420        /// Uses the path's hash representation as the key and its corresponding `GroupID` as the
421        /// value.
422        path_hashes: RwLock<HashMap<PathHash, GroupID>>,
423
424        /// Holds a cached version of the archive. This may include content for groups that are
425        /// currently non-existent.
426        pub archive: RwLock<archive::Archive>,
427
428        /// AES-256 encryption key for securing data.
429        ///
430        /// This key is used when the encryption feature is enabled. It ensures that stored data is
431        /// encrypted, adding an additional layer of security.
432        #[cfg(feature = "crypt")]
433        #[debug(with = "fmt_encryption_key")]
434        pub crypt_key: RwLock<Option<[u8; 32]>>,
435    }
436
437    fn fmt_monitor(
438        monitor: &RwLock<Box<dyn Monitor>>,
439        f: &mut std::fmt::Formatter<'_>,
440    ) -> std::fmt::Result {
441        let ptr = &**monitor.read() as *const _;
442
443        // Extract data pointers of `ptr`, and `&EmptyMonitor`
444        let exists = ptr as *const () != &EmptyMonitor as *const _ as *const ();
445
446        write!(f, "{:?}", exists.then_some(ptr))
447    }
448
449    #[cfg(feature = "crypt")]
450    fn fmt_encryption_key(
451        key: &RwLock<Option<[u8; 32]>>,
452        f: &mut std::fmt::Formatter<'_>,
453    ) -> std::fmt::Result {
454        let exists = key.read().is_some();
455        write!(f, "{:?}", exists)
456    }
457
458    /// A dummy monitor class, which represnet empty monitor.
459    pub(super) struct EmptyMonitor;
460    impl Monitor for EmptyMonitor {}
461
462    impl Default for Inner {
463        fn default() -> Self {
464            Self::new(Box::new(EmptyMonitor))
465        }
466    }
467
468    #[derive(Debug)]
469    struct GroupRegistration {
470        context: Arc<GroupContext>,
471        evt_on_update: noti::Sender,
472    }
473
474    impl Inner {
475        pub fn new(monitor: Box<dyn Monitor>) -> Self {
476            Self {
477                id: StorageID::new_unique_incremental(),
478                monitor: RwLock::new(monitor),
479                archive: Default::default(),
480                #[cfg(feature = "crypt")]
481                crypt_key: Default::default(),
482
483                // NOTE: Uses 4 shards for both maps. The default implementation's shared amount,
484                all_groups: Default::default(),
485                path_hashes: Default::default(),
486            }
487        }
488
489        pub fn notify_edition(&self, group_id: GroupID) {
490            if let Some(group) = self.all_groups.read().get(&group_id) {
491                group.context.version.fetch_add(1, Ordering::Relaxed);
492                group.evt_on_update.notify();
493            }
494        }
495
496        pub fn find_group(&self, path_hash: &PathHash) -> Option<Arc<GroupContext>> {
497            self.path_hashes
498                .read()
499                .get(path_hash)
500                .and_then(|id| self.all_groups.read().get(id).map(|x| x.context.clone()))
501        }
502
503        pub fn register_group(
504            &self,
505            path_hash: PathHash,
506            context: Arc<GroupContext>,
507            evt_on_update: noti::Sender,
508        ) -> Result<Arc<GroupContext>, GroupCreationError> {
509            // Path-hash to GroupID mappings might experience collisions due to simultaneous access.
510            // To ensure integrity, only consider the group insertion successful when its path_hash
511            // is successfully registered to its corresponding group ID.
512            let group_id = context.group_id;
513            let inserted = context.clone();
514            let rg = GroupRegistration { context, evt_on_update };
515
516            // If the path already exists in the archive, load the corresponding node.
517            if let Some(node) = self.archive.read().find_path(rg.context.path.iter()) {
518                Self::load_node(
519                    &rg.context,
520                    node,
521                    &EmptyMonitor,
522                    #[cfg(feature = "crypt")]
523                    Self::crypt_key_loader(&self.crypt_key),
524                );
525            }
526
527            // Ensure that the Group ID is unique.
528            assert!(
529                self.all_groups.write().insert(group_id, rg).is_none(),
530                "Group IDs must be unique." // Ensure we haven't exhausted all 2^64 possibilities.
531            );
532
533            // Check for path-hash collisions. In the rare case where a collision occurs due to
534            // another thread registering the same path-hash, we remove the current group
535            // registration and return an error.
536            if self.path_hashes.write().entry(path_hash).or_insert(group_id) != &group_id {
537                self.all_groups.write().remove(&group_id);
538                return Err(GroupCreationError::PathCollisionRace(inserted.path.clone()));
539            }
540
541            // Notify the monitor that a new group has been added.
542            self.monitor.write().group_added(group_id, &inserted);
543            Ok(inserted)
544        }
545
546        pub fn unregister_group(&self, group_id: GroupID, path_hash: PathHash) {
547            {
548                let mut path_hashes = self.path_hashes.write();
549                if !path_hashes.get(&path_hash).is_some_and(|v| *v == group_id) {
550                    tr::debug!(?group_id, ?path_hash, "unregister_group() call to unexist group");
551                    return;
552                };
553
554                path_hashes.remove(&path_hash);
555            }
556
557            if let Some(ctx) = self.all_groups.write().remove(&group_id) {
558                let _s = tr::info_span!(
559                    "unregister_group()",
560                    template = ?ctx.context.template_name,
561                    path = ?ctx.context.path
562                );
563
564                // Consider the removal valid only if the group was previously and validly created.
565                // The method `all_groups.remove` might return `None` if the group was not
566                // successfully registered during `register_group`. If this happens, this function
567                // gets invoked during the disposal of `GroupUnregisterHook` within the
568                // `create_impl` function.
569
570                // For valid removals, add contents to the cached archive.
571                Self::dump_node(
572                    &ctx.context,
573                    &mut self.archive.write(),
574                    #[cfg(feature = "crypt")]
575                    Self::crypt_key_loader(&self.crypt_key),
576                );
577
578                // Notify about the removal
579                self.monitor.write().group_removed(group_id);
580            }
581        }
582
583        pub fn on_value_update(&self, group_id: GroupID, data: &entity::EntityData, silent: bool) {
584            // Monitor should always be notified on value update, regardless of silent flag
585            self.monitor.read().entity_value_updated(group_id, data.id);
586
587            // If silent flag is set, skip internal notify to other instances.
588            if silent {
589                return;
590            }
591
592            // This is trivially fallible operation.
593            if let Some(group) = self.all_groups.read().get(&group_id) {
594                group.context.version.fetch_add(1, Ordering::Relaxed);
595                group.evt_on_update.notify();
596            }
597        }
598
599        pub fn replace_monitor(&self, new_monitor: Box<dyn Monitor>) -> Box<dyn Monitor> {
600            // At the start of the operation, we replace the monitor. This means that before we add all
601            // valid groups to the new monitor, there may be notifications about group removals or updates.
602            // Without redesigning the entire storage system into a more expensive locking mechanism,
603            // there's no way to avoid this. We assume that monitor implementation providers will handle
604            // any incorrect updates gracefully and thus, we are ignoring this case.
605
606            let old_monitor = replace(&mut *self.monitor.write(), new_monitor);
607
608            // NOTE: Ensuring thread-safe behavior during the initialization of a new monitor:
609            // - Iteration partially locks `path_hashes` based on shards (see DashMap
610            //   implementation).
611            // - This means that new group insertions or removals can occur during iteration.
612            // - However, it's guaranteed that while iterating over a shard, no other thread can
613            //   modify the same shard of `path_hashes`.
614            // - Since every group insertion or removal first modifies `path_hashes`, it's safe to
615            //   assume we see a consistent state of `path_hashes` during shard iteration, given
616            //   we're observing the read-locked state of that shard.
617            for group_id in self.path_hashes.read().values() {
618                let all_groups = self.all_groups.read();
619                let Some(group) = all_groups.get(group_id) else {
620                    unreachable!(
621                        "As long as the group_id is found from path_hashes, \
622                        the group must be found from `all_groups`."
623                    )
624                };
625
626                // Since we call `group_added` on every group,
627                self.monitor.write().group_added(*group_id, &group.context);
628            }
629
630            old_monitor
631        }
632
633        /// ⚠️ **CAUTION!** Do NOT alter this literal! Any modification will DESTROY all existing
634        /// encrypted data irreparably! ⚠️
635        ///
636        /// PREFIX itself is valid base64, which is decorated word 'secret'.
637        #[cfg(feature = "crypt")]
638        const CRYPT_PREFIX: &'static str = "+/+sE/cRE+t//";
639
640        #[cfg(feature = "crypt")]
641        fn crypt_key_loader(
642            key: &RwLock<Option<[u8; 32]>>,
643        ) -> impl Fn() -> Option<[u8; 32]> + '_ + Copy {
644            || key.read().or_else(Self::crypt_sys_key)
645        }
646
647        /// Uses hard coded NONCE, just to run the algorithm.
648        #[cfg(feature = "crypt")]
649        const CRYPT_NONCE: [u8; 12] = [15, 43, 5, 12, 6, 66, 126, 231, 141, 18, 33, 71];
650
651        #[cfg(feature = "crypt")]
652        fn crypt_sys_key() -> Option<[u8; 32]> {
653            #[cfg(feature = "crypt-machine-id")]
654            {
655                use std::sync::OnceLock;
656                static CACHED: OnceLock<Option<[u8; 32]>> = OnceLock::new();
657
658                *CACHED.get_or_init(|| {
659                    machine_uid::get().ok().map(|uid| {
660                        use sha2::{Digest, Sha256};
661                        let arr = &*Sha256::new().chain_update(uid).finalize();
662                        std::array::from_fn(|index| arr[index])
663                    })
664                })
665            }
666
667            #[cfg(not(feature = "crypt-machine-id"))]
668            {
669                None
670            }
671        }
672
673        fn dump_node(
674            ctx: &GroupContext,
675            archive: &mut archive::Archive,
676            #[cfg(feature = "crypt")] crypt_key_loader: impl Fn() -> Option<[u8; 32]>,
677        ) {
678            let _s = tr::info_span!("dump_node()", template=?ctx.template_name, path=?ctx.path);
679
680            let paths = ctx.path.iter();
681            let node = archive.find_or_create_path_mut(paths);
682
683            // Clear existing values before dumping.
684            node.values.clear();
685
686            #[cfg(feature = "crypt")]
687            let mut crypt_key = None;
688
689            '_outer: for (meta, val) in ctx
690                .sources
691                .iter()
692                .map(|e| e.property_value())
693                .filter(|(meta, _)| !meta.metadata.flags.contains(MetaFlag::NO_EXPORT))
694            {
695                let _s = tr::info_span!("node dump", varname=?meta.varname);
696                let dst = node.values.entry(meta.name.into()).or_default();
697
698                #[cfg(feature = "crypt")]
699                'encryption: {
700                    use aes_gcm::aead::{Aead, KeyInit};
701                    use base64::prelude::*;
702
703                    if !meta.metadata.flags.contains(MetaFlag::SECRET) {
704                        break 'encryption;
705                    }
706
707                    if crypt_key.is_none() {
708                        crypt_key = Some(crypt_key_loader().ok_or(()));
709                    }
710
711                    // Check if key was correctly loaded. If not, skip serialization itself to not
712                    // export delicate data.
713                    let Ok(key) = crypt_key.as_ref().unwrap() else {
714                        tr::warn!("Crypt key missing. Skipping secret data serialization.");
715                        continue '_outer;
716                    };
717                    let Ok(json) = serde_json::to_vec(val.as_serialize()) else {
718                        tr::warn!("JSON dump failed");
719                        continue '_outer;
720                    };
721
722                    let cipher = aes_gcm::Aes256Gcm::new(key.into());
723                    let Ok(enc) = cipher.encrypt(&Self::CRYPT_NONCE.into(), &json[..]) else {
724                        tr::warn!("Encryption failed");
725                        continue '_outer;
726                    };
727
728                    *dst = serde_json::Value::String(format!(
729                        "{}{}",
730                        Self::CRYPT_PREFIX,
731                        BASE64_STANDARD_NO_PAD.encode(&enc)
732                    ));
733
734                    continue '_outer;
735                }
736
737                #[cfg(not(feature = "crypt"))]
738                if meta.metadata.flags.contains(MetaFlag::SECRET) {
739                    tr::warn!("`crypt` Feature disabled: Skipping secret data serialization.");
740                    continue;
741                }
742
743                match serde_json::to_value(val.as_serialize()) {
744                    Ok(val) => *dst = val,
745                    Err(error) => {
746                        tr::warn!(%error, "JSON dump failed");
747                    }
748                }
749            }
750        }
751
752        fn load_node(
753            ctx: &GroupContext,
754            node: &archive::Archive,
755            monitor: &dyn Monitor,
756            #[cfg(feature = "crypt")] crypt_key_loader: impl Fn() -> Option<[u8; 32]>,
757        ) -> bool {
758            let _s = tr::info_span!("load_node()", template=?ctx.template_name, path=?ctx.path);
759            let mut has_update = false;
760
761            #[cfg(feature = "crypt")]
762            let mut crypt_key = None;
763
764            '_outer: for (elem, de) in ctx
765                .sources
766                .iter()
767                .filter(|e| !e.meta.flags.contains(MetaFlag::NO_IMPORT))
768                .filter_map(|x| node.values.get(x.meta.name).map(|o| (x, o)))
769            {
770                let _s = tr::info_span!("node load", varname=?elem.meta.varname);
771
772                #[allow(unused_mut)]
773                let mut update_result = None;
774
775                #[cfg(feature = "crypt")]
776                'decryption: {
777                    use aes_gcm::aead::{Aead, KeyInit};
778                    use base64::prelude::*;
779
780                    if !elem.meta.flags.contains(MetaFlag::SECRET) {
781                        // Just try to deserialize from plain value.
782                        break 'decryption;
783                    }
784
785                    // Non-string value is not an encrypted property. serve as-is.
786                    let Some(str) = de.as_str() else { break 'decryption };
787
788                    // Verify if it is encrpyted string repr.
789                    if !str.starts_with(Self::CRYPT_PREFIX) {
790                        tr::debug!("Non-encrypted string repr. serve as-is.");
791                        break 'decryption;
792                    }
793
794                    let str = &str[Self::CRYPT_PREFIX.len()..];
795                    let Ok(bin) = BASE64_STANDARD_NO_PAD.decode(str).map_err(|error| {
796                        tr::debug!(
797                            %error,
798                            "Crypt-prefixed string is not valid base64. \
799                             Trying to parse as plain string."
800                        )
801                    }) else {
802                        break 'decryption;
803                    };
804
805                    if crypt_key.is_none() {
806                        crypt_key = Some(crypt_key_loader().ok_or(()));
807                    }
808
809                    // Check if key was correctly loaded. If not, just try to parse string as plain
810                    // string ... which woun't be very useful though.
811                    let Ok(key) = crypt_key.as_ref().unwrap() else {
812                        tr::warn!("Crypt key missing. Skipping secret data serialization.");
813                        break 'decryption;
814                    };
815
816                    let cipher = aes_gcm::Aes256Gcm::new(key.into());
817                    let Ok(json) =
818                        cipher.decrypt(&Self::CRYPT_NONCE.into(), &bin[..]).map_err(|error| {
819                            tr::warn!(%error, "Failed to decrypt secret data");
820                        })
821                    else {
822                        break 'decryption;
823                    };
824
825                    update_result = Some(
826                        elem.update_value_from(&mut serde_json::Deserializer::from_slice(&json)),
827                    );
828                }
829
830                match update_result.unwrap_or_else(|| elem.update_value_from(de)) {
831                    Ok(_) => {
832                        has_update = true;
833                        monitor.entity_value_updated(ctx.group_id, elem.id);
834                    }
835                    Err(error) => {
836                        tr::warn!(%error, "Element value update error during node loading")
837                    }
838                }
839            }
840
841            if has_update {
842                // On successful load, set its fence value as 1, to make the first client
843                //  side's call to `update()` call would be triggered.
844                ctx.version.fetch_add(1, Ordering::Release);
845            }
846
847            has_update
848        }
849    }
850
851    /* ------------------------------------ Import Operation ------------------------------------ */
852    #[derive(Setters)]
853    #[setters(borrow_self)]
854    pub struct ImportOnDrop<'a> {
855        #[setters(skip)]
856        inner: &'a Inner,
857
858        #[setters(skip)]
859        archive: ManuallyDrop<Archive>,
860
861        /// When set to true, the imported config will be merged with the existing cache. This is typically
862        /// useful to prevent unsaved archive entities from being overwritten.
863        ///
864        /// Default is `true`.
865        merge_onto_cache: bool,
866
867        /// If this option is enabled, the imported settings will be treated as a 'patch' before being applied.
868        /// If disabled, the imported settings will directly overwrite existing ones, affecting all properties
869        /// in the archive even if the archive content hasn't actually changed.
870        ///
871        /// Default is `true`.
872        apply_as_patch: bool,
873    }
874
875    impl<'a> ImportOnDrop<'a> {
876        pub(super) fn new(inner: &'a Inner, archive: Archive) -> Self {
877            Self {
878                inner,
879                archive: ManuallyDrop::new(archive),
880                merge_onto_cache: true,
881                apply_as_patch: true,
882            }
883        }
884    }
885
886    impl<'a> Drop for ImportOnDrop<'a> {
887        fn drop(&mut self) {
888            // SAFETY: Typical `ManuallyDrop` usage.
889            let mut imported = unsafe { ManuallyDrop::take(&mut self.archive) };
890            let this = self.inner;
891
892            #[cfg(feature = "crypt")]
893            let key_loader = Inner::crypt_key_loader(&this.crypt_key);
894
895            let import_archive = |archive: &Archive| {
896                for group in this.all_groups.read().values() {
897                    let path = &group.context.path;
898                    let path = path.iter();
899                    let Some(node) = archive.find_path(path) else { continue };
900
901                    if Inner::load_node(
902                        &group.context,
903                        node,
904                        &**this.monitor.read(),
905                        #[cfg(feature = "crypt")]
906                        key_loader,
907                    ) {
908                        group.evt_on_update.notify();
909                    }
910                }
911            };
912
913            let mut self_archive = this.archive.write();
914            if self.apply_as_patch {
915                let patch = self_archive.create_patch(&mut imported);
916                import_archive(&patch);
917
918                if self.merge_onto_cache {
919                    self_archive.merge_from(patch);
920                } else {
921                    imported.merge_from(patch);
922                    *self_archive = imported;
923                }
924            } else {
925                if self.merge_onto_cache {
926                    self_archive.merge_from(imported);
927                } else {
928                    *self_archive = imported;
929                }
930
931                import_archive(&self_archive);
932            }
933        }
934    }
935
936    /* ------------------------------------ Export Operation ------------------------------------ */
937    #[derive(Setters)]
938    pub struct ExportTask<'a> {
939        #[setters(skip)]
940        inner: &'a Inner,
941
942        /// On export, the storage gathers only active instances of config groups. If this is set to true,
943        /// the collected results will be merged with the existing dump cache, preserving
944        /// the archive data of uninitialized config groups.
945        ///
946        /// If set to false, only active config groups will be exported.
947        ///
948        /// Default is `true`
949        merge_onto_dumped: bool,
950
951        /// When this option is true, the storage will overwrite the import cache with the exported data.
952        /// This will influence the creation of the next config group.
953        ///
954        /// Default is `true`
955        replace_import_cache: bool,
956    }
957
958    impl<'a> ExportTask<'a> {
959        pub(super) fn new(inner: &'a Inner) -> Self {
960            Self { inner, merge_onto_dumped: true, replace_import_cache: true }
961        }
962
963        /// Performs export operation with given settings
964        pub fn collect(self) -> Archive {
965            let mut archive = Archive::default();
966            let this = self.inner;
967
968            #[cfg(feature = "crypt")]
969            let key_loader = Inner::crypt_key_loader(&this.crypt_key);
970
971            for group in this.all_groups.read().values() {
972                Inner::dump_node(
973                    &group.context,
974                    &mut archive,
975                    #[cfg(feature = "crypt")]
976                    key_loader,
977                );
978            }
979
980            let mut self_archive = this.archive.write();
981            if !self.merge_onto_dumped {
982                if self.replace_import_cache {
983                    *self_archive = archive;
984                    self_archive.clone()
985                } else {
986                    archive
987                }
988            } else if self.replace_import_cache {
989                self_archive.merge_from(archive);
990                self_archive.clone()
991            } else {
992                archive.merge(self_archive.clone())
993            }
994        }
995    }
996}
997
998#[cfg(feature = "arc-swap")]
999pub mod atomic {
1000    use arc_swap::ArcSwap;
1001
1002    use super::Storage;
1003
1004    pub struct AtomicStorageArc(ArcSwap<super::inner::Inner>);
1005
1006    impl From<Storage> for AtomicStorageArc {
1007        fn from(value: Storage) -> Self {
1008            Self(ArcSwap::new(value.0))
1009        }
1010    }
1011
1012    impl Default for AtomicStorageArc {
1013        fn default() -> Self {
1014            Self::from(Storage::default())
1015        }
1016    }
1017
1018    impl AtomicStorageArc {
1019        pub fn load(&self) -> Storage {
1020            Storage(self.0.load_full())
1021        }
1022
1023        pub fn store(&self, storage: Storage) {
1024            self.0.store(storage.0)
1025        }
1026    }
1027}