config_it/config/storage.rs
1//! The `storage` module provides a high-level interface to interact with configuration data,
2//! ensuring safety, flexibility, and efficiency.
3//!
4//! This module primarily revolves around the `Storage` struct, which serves as the main access
5//! point for users to interact with the underlying storage system. By abstracting intricate
6//! operations into straightforward methods, it simplifies user interaction with stored
7//! configuration data.
8//!
9//! Key features include:
10//! - **Data Retrieval and Creation**: Safely find or create items with `find_or_create`.
11//! - **Data Import/Export**: Handle complex serialization and deserialization logic seamlessly with
12//! `import` and `exporter`.
13//! - **Monitoring**: Integrate external monitoring systems and receive updates about storage
14//! modifications using the `replace_monitor`, `unset_monitor`, and `notify_editions` methods.
15//! - **Encryption Support**: Securely encrypt data (when the encryption feature is enabled) using
16//! `set_encryption_key`.
17
18use std::{
19 any::{Any, TypeId},
20 mem::replace,
21 sync::{
22 atomic::{AtomicU64, Ordering},
23 Arc, Weak,
24 },
25};
26
27use strseq::SharedStringSequence;
28
29use crate::{
30 config::{entity, noti},
31 shared::{archive, GroupID, ItemID, PathHash},
32};
33
34use super::{
35 entity::EntityEventHook,
36 group::{self, GroupContext},
37};
38
39/* ---------------------------------------------------------------------------------------------- */
40/* STORAGE MONITOR TRAIT */
41/* ---------------------------------------------------------------------------------------------- */
42
43/// Monitors every storage actions. If monitor tracks every event of single storage, it can
44/// replicate internal state perfectly.
45///
46/// Monitor should properly handle property flags, such as `ADMIN`, `READONLY`, `SECRET`, etc ...
47///
48/// Blocking behavior may cause deadlock, thus monitor should be implemented as non-blocking
49/// manner. (e.g. forwarding events to channel)
50pub trait Monitor: Send + Sync + 'static {
51 /// Called when new group is added.
52 fn group_added(&mut self, group_id: GroupID, group: &Arc<GroupContext>) {
53 let _ = (group_id, group);
54 }
55
56 /// Can be call with falsy group_id if monitor is being replaced.
57 fn group_removed(&mut self, group_id: GroupID) {
58 let _ = group_id;
59 }
60
61 /// Called when any entity value is updated. Can be called with falsy group_id if monitor is
62 /// being replaced.
63 ///
64 /// Since this is called frequently compared to group modification commands, receives immutable
65 /// self reference. Therefore, all state modification should be handled with interior
66 /// mutability!
67 fn entity_value_updated(&self, group_id: GroupID, item_id: ItemID) {
68 let _ = (group_id, item_id);
69 }
70}
71
72/* ---------------------------------------------------------------------------------------------- */
73/* STORAGE API */
74/* ---------------------------------------------------------------------------------------------- */
75/// Provides a high-level, thread-safe interface to the configuration storage system.
76///
77/// `Storage` acts as the primary access point for users to interact with the underlying storage
78/// system. It abstracts away the intricacies of direct storage interactions by wrapping around the
79/// `inner::Inner` type, ensuring concurrent safety.
80///
81/// With `Storage`, users can seamlessly perform read and write operations on their configuration
82/// data without worrying about potential concurrency issues. This design ensures that the storage
83/// system remains robust and efficient, even in multi-threaded environments.
84///
85/// # Features:
86/// - **Thread Safety**: Guarantees safe concurrent access to the configuration storage.
87/// - **High-level Interface**: Abstracts the complexities of direct storage interactions, offering
88/// a user-friendly API.
89#[derive(Debug, Default, Clone)]
90pub struct Storage(Arc<inner::Inner>);
91
92#[derive(thiserror::Error, Debug)]
93pub enum GroupFindError {
94 #[error("Given path was not found")]
95 PathNotFound,
96 #[error("Type ID mismatch from original registration")]
97 MismatchedTypeID,
98}
99
100#[derive(thiserror::Error, Debug)]
101pub enum GroupFindOrCreateError {
102 #[error("Type ID mismatch from original registration")]
103 MismatchedTypeID,
104}
105
106#[derive(thiserror::Error, Debug)]
107pub enum GroupCreationError {
108 #[error("Path name duplicated, found early! Path was: {0:?}")]
109 PathCollisionEarly(SharedStringSequence),
110 #[error("Path name duplication found during registeration. Path was: {0:?}")]
111 PathCollisionRace(SharedStringSequence),
112}
113
114impl Storage {
115 /// Gets ID of this storage instance. ID is unique per single program instance.
116 pub fn storage_id(&self) -> crate::shared::StorageID {
117 self.0.id
118 }
119
120 /// Searches for an existing item of type `T` in the storage, or creates a new one if it doesn't
121 /// exist.
122 ///
123 /// # Arguments
124 ///
125 /// * `T` - The type of item to search for or create.
126 ///
127 /// # Returns
128 ///
129 /// A reference to the existing or newly created item of type `T`. Value remains in template
130 /// default until you call first `update()` on it.
131 pub fn find_or_create<'a, T>(
132 &self,
133 path: impl IntoIterator<Item = impl AsRef<str> + 'a>,
134 ) -> Result<group::Group<T>, GroupFindOrCreateError>
135 where
136 T: group::Template,
137 {
138 let keys = SharedStringSequence::from_iter(path);
139 let path_hash = PathHash::new(keys.iter());
140
141 use GroupCreationError as GCE;
142 use GroupFindError as GFE;
143 use GroupFindOrCreateError as GFOE;
144
145 loop {
146 match self.find(path_hash) {
147 Ok(found) => break Ok(found),
148 Err(GFE::MismatchedTypeID) => break Err(GFOE::MismatchedTypeID),
149 Err(GFE::PathNotFound) => {}
150 }
151
152 match self.create_impl::<T>(keys.clone()) {
153 Ok(created) => break Ok(created),
154
155 // Simply retry on path collision
156 Err(GCE::PathCollisionEarly(_) | GCE::PathCollisionRace(_)) => continue,
157 }
158 }
159 }
160
161 /// Find a group with the given path and template type.
162 ///
163 /// # Arguments
164 ///
165 /// * `path` - The path of the group to find.
166 ///
167 /// # Type Parameters
168 ///
169 /// * `T` - The type of the template to use for the group.
170 ///
171 /// # Returns
172 ///
173 /// Returns a `Result` containing the found group or a `GroupFindError` if the group was not
174 /// found or if the template type does not match the expected type. Value remains in template
175 /// default until you call first `update()` on it.
176 pub fn find<T: group::Template>(
177 &self,
178 path: impl Into<PathHash>,
179 ) -> Result<group::Group<T>, GroupFindError> {
180 let path_hash = path.into();
181
182 if let Some(group) = self.0.find_group(&path_hash) {
183 if group.template_type_id != std::any::TypeId::of::<T>() {
184 Err(GroupFindError::MismatchedTypeID)
185 } else if let Some(anchor) = group.w_unregister_hook.upgrade() {
186 Ok(group::Group::create_with__(group, anchor))
187 } else {
188 // This is corner case where group was disposed during `find_group` is invoked.
189 Err(GroupFindError::PathNotFound)
190 }
191 } else {
192 Err(GroupFindError::PathNotFound)
193 }
194 }
195
196 /// Creates a new instance of the `Storage` struct with the specified type parameter. Value
197 /// remains in template default until you call first `update()` on it.
198 pub fn create<'a, T>(
199 &self,
200 path: impl IntoIterator<Item = &'a (impl AsRef<str> + ?Sized + 'a)>,
201 ) -> Result<group::Group<T>, GroupCreationError>
202 where
203 T: group::Template,
204 {
205 self.create_impl(path.into_iter().collect())
206 }
207
208 fn create_impl<T: group::Template>(
209 &self,
210 path: SharedStringSequence,
211 ) -> Result<group::Group<T>, GroupCreationError> {
212 assert!(!path.is_empty());
213 assert!(path.iter().all(|x| !x.is_empty()));
214
215 let path_hash = PathHash::new(path.iter());
216
217 // Naively check if there's already existing group with same path.
218 if self.0.find_group(&path_hash).is_some() {
219 return Err(GroupCreationError::PathCollisionEarly(path));
220 }
221
222 // This ID may not be used if group creation failed ... it's generally okay since we have
223 // 2^63 trials.
224 let register_id = GroupID::new_unique_incremental();
225 let entity_hook = Arc::new(EntityHookImpl { register_id, inner: Arc::downgrade(&self.0) });
226
227 debug_assert!(
228 T::props__().windows(2).all(|x| x[0].index + 1 == x[1].index),
229 "Something wrong with property generation"
230 );
231
232 let sources: Vec<_> = T::props__()
233 .iter()
234 .map(|prop| entity::EntityData::new(prop, entity_hook.clone()))
235 .collect();
236
237 // Drops the group when the final group instance is dropped.
238 let unregister_anchor = Arc::new(GroupUnregisterHook {
239 register_id,
240 path_hash,
241 inner: Arc::downgrade(&self.0),
242 });
243
244 // Create core config set context with reflected target metadata set
245 let tx_noti = noti::Sender::new();
246 let context = Arc::new(GroupContext {
247 group_id: register_id,
248 template_type_id: TypeId::of::<T>(),
249 template_name: T::template_name(),
250 w_unregister_hook: Arc::downgrade(
251 &(unregister_anchor.clone() as Arc<dyn Any + Send + Sync>),
252 ),
253 sources: sources.into(),
254 version: AtomicU64::new(1), // NOTE: This will trigger initial check_update() always.
255 update_receiver_channel: tx_noti.receiver(true),
256 path: path.clone(),
257 });
258
259 self.0
260 .register_group(path_hash, context, tx_noti)
261 .map(|context| group::Group::create_with__(context, unregister_anchor))
262 }
263
264 /// Create internal archive export task.
265 ///
266 /// You should explicitly call `confirm()` to retrieve the exported archive explcitly.
267 pub fn exporter(&self) -> inner::ExportTask {
268 inner::ExportTask::new(&self.0)
269 }
270
271 /// Deserializes the data.
272 ///
273 /// # Data Serialization Rules:
274 /// - The root component is the first path component and is written as-is.
275 /// - Subsequent path components must be prefixed with a `~` (tilde) character.
276 /// - If not prefixed, they are treated as a field element of the enclosing path component.
277 /// - A key prefixed with '~' within an existing field is ...
278 /// (Note: The comment here seems to be incomplete; please provide further details.)
279 ///
280 /// # Example JSON structure:
281 /// ```json
282 /// {
283 /// "root_path": {
284 /// "~path_component": {
285 /// "field_name": "value",
286 /// "other_field": {
287 /// "~this_is_not_treated_as_path": 123
288 /// }
289 /// },
290 /// "~another_path_component": {},
291 /// "field_name_of_root_path": "yay"
292 /// },
293 /// "another_root_path": {}
294 /// }
295 /// ```
296 ///
297 /// # Returns
298 ///
299 /// An instance of `ImportOnDrop` which will handle the import operation.
300 pub fn import(&self, archive: archive::Archive) -> inner::ImportOnDrop {
301 inner::ImportOnDrop::new(&self.0, archive)
302 }
303
304 /// Replaces the current monitor with the provided one.
305 ///
306 /// This function dumps the active list of groups to the new monitor sequentially. If the
307 /// monitor is not efficiently implemented, this operation can significantly impact the
308 /// performance of all storage consumers and replicators. Therefore, exercise caution when
309 /// replacing the monitor on a storage instance that's under heavy use.
310 ///
311 /// # Arguments
312 ///
313 /// * `handler` - The new monitor to replace the current one.
314 ///
315 /// # Returns
316 ///
317 /// The previous monitor that has been replaced.
318 pub fn replace_monitor(&self, handler: Box<impl Monitor>) -> Box<dyn Monitor> {
319 self.0.replace_monitor(handler)
320 }
321
322 /// Unset monitor instance.
323 pub fn unset_monitor(&self) {
324 *self.0.monitor.write() = Box::new(inner::EmptyMonitor);
325 }
326
327 /// Send monitor event to storage driver.
328 pub fn notify_editions(&self, items: impl IntoIterator<Item = GroupID>) {
329 for group in items {
330 self.0.notify_edition(group);
331 }
332 }
333
334 /// Sets the encryption key.
335 ///
336 /// If the encryption key is not provided before the first encryption or decryption operation, it will be automatically generated based on the machine's unique identifier (UID). If machine-UID generation is not supported, a predefined, hard-coded sequence will be used as the key.
337 ///
338 /// # Arguments
339 ///
340 /// * `key` - Byte slice representing the encryption key.
341 #[cfg(feature = "crypt")]
342 pub fn set_crypt_key(&self, key: impl AsRef<[u8]>) {
343 use sha2::{Digest, Sha256};
344 let arr = &*Sha256::new().chain_update(key).finalize();
345 self.0.crypt_key.write().replace(std::array::from_fn(|index| arr[index]));
346 }
347}
348
349/* ---------------------------------------------------------------------------------------------- */
350/* INTERNALS */
351/* ---------------------------------------------------------------------------------------------- */
352
353struct GroupUnregisterHook {
354 register_id: GroupID,
355 path_hash: PathHash,
356 inner: Weak<inner::Inner>,
357}
358
359impl Drop for GroupUnregisterHook {
360 fn drop(&mut self) {
361 // Just ignore result. If channel was closed before the set is unregistered,
362 // it's ok to ignore this operation silently.
363 let Some(inner) = self.inner.upgrade() else { return };
364 inner.unregister_group(self.register_id, self.path_hash);
365 }
366}
367
368struct EntityHookImpl {
369 register_id: GroupID,
370 inner: Weak<inner::Inner>,
371}
372
373impl EntityEventHook for EntityHookImpl {
374 fn on_value_changed(&self, data: &entity::EntityData, silent: bool) {
375 // Update notification is transient, thus when storage driver is busy, it can
376 // just be dropped.
377 let Some(inner) = self.inner.upgrade() else { return };
378 inner.on_value_update(self.register_id, data, silent);
379 }
380}
381
382mod inner {
383 use std::{collections::HashMap, mem::ManuallyDrop};
384
385 use derive_setters::Setters;
386 use parking_lot::RwLock;
387
388 use crate::{
389 config::entity::Entity,
390 shared::{archive::Archive, meta::MetaFlag, StorageID},
391 };
392
393 use super::*;
394
395 /// Manages and drives internal storage events.
396 ///
397 /// Primarily responsible for handling update requests and orchestrating
398 /// the underlying storage mechanisms.
399 #[derive(cs::Debug)]
400 pub(super) struct Inner {
401 /// Unique(during runtime) identifier for this storage.
402 pub id: StorageID,
403
404 /// Maintains a registry of all configuration sets within this storage.
405 ///
406 /// The key is the group's unique identifier, `GroupID`.
407 all_groups: RwLock<HashMap<GroupID, GroupRegistration>>,
408
409 /// Maintains a list of all monitors registered to this storage.
410 ///
411 /// Upon each monitoring event, the storage driver iterates over each session channel to
412 /// attempt replication. This ensures that all components are kept in sync with storage
413 /// changes.
414 #[debug(with = "fmt_monitor")]
415 pub monitor: RwLock<Box<dyn Monitor>>,
416
417 /// Keeps track of registered path hashes to quickly identify potential path name
418 /// duplications.
419 ///
420 /// Uses the path's hash representation as the key and its corresponding `GroupID` as the
421 /// value.
422 path_hashes: RwLock<HashMap<PathHash, GroupID>>,
423
424 /// Holds a cached version of the archive. This may include content for groups that are
425 /// currently non-existent.
426 pub archive: RwLock<archive::Archive>,
427
428 /// AES-256 encryption key for securing data.
429 ///
430 /// This key is used when the encryption feature is enabled. It ensures that stored data is
431 /// encrypted, adding an additional layer of security.
432 #[cfg(feature = "crypt")]
433 #[debug(with = "fmt_encryption_key")]
434 pub crypt_key: RwLock<Option<[u8; 32]>>,
435 }
436
437 fn fmt_monitor(
438 monitor: &RwLock<Box<dyn Monitor>>,
439 f: &mut std::fmt::Formatter<'_>,
440 ) -> std::fmt::Result {
441 let ptr = &**monitor.read() as *const _;
442
443 // Extract data pointers of `ptr`, and `&EmptyMonitor`
444 let exists = ptr as *const () != &EmptyMonitor as *const _ as *const ();
445
446 write!(f, "{:?}", exists.then_some(ptr))
447 }
448
449 #[cfg(feature = "crypt")]
450 fn fmt_encryption_key(
451 key: &RwLock<Option<[u8; 32]>>,
452 f: &mut std::fmt::Formatter<'_>,
453 ) -> std::fmt::Result {
454 let exists = key.read().is_some();
455 write!(f, "{:?}", exists)
456 }
457
458 /// A dummy monitor class, which represnet empty monitor.
459 pub(super) struct EmptyMonitor;
460 impl Monitor for EmptyMonitor {}
461
462 impl Default for Inner {
463 fn default() -> Self {
464 Self::new(Box::new(EmptyMonitor))
465 }
466 }
467
468 #[derive(Debug)]
469 struct GroupRegistration {
470 context: Arc<GroupContext>,
471 evt_on_update: noti::Sender,
472 }
473
474 impl Inner {
475 pub fn new(monitor: Box<dyn Monitor>) -> Self {
476 Self {
477 id: StorageID::new_unique_incremental(),
478 monitor: RwLock::new(monitor),
479 archive: Default::default(),
480 #[cfg(feature = "crypt")]
481 crypt_key: Default::default(),
482
483 // NOTE: Uses 4 shards for both maps. The default implementation's shared amount,
484 all_groups: Default::default(),
485 path_hashes: Default::default(),
486 }
487 }
488
489 pub fn notify_edition(&self, group_id: GroupID) {
490 if let Some(group) = self.all_groups.read().get(&group_id) {
491 group.context.version.fetch_add(1, Ordering::Relaxed);
492 group.evt_on_update.notify();
493 }
494 }
495
496 pub fn find_group(&self, path_hash: &PathHash) -> Option<Arc<GroupContext>> {
497 self.path_hashes
498 .read()
499 .get(path_hash)
500 .and_then(|id| self.all_groups.read().get(id).map(|x| x.context.clone()))
501 }
502
503 pub fn register_group(
504 &self,
505 path_hash: PathHash,
506 context: Arc<GroupContext>,
507 evt_on_update: noti::Sender,
508 ) -> Result<Arc<GroupContext>, GroupCreationError> {
509 // Path-hash to GroupID mappings might experience collisions due to simultaneous access.
510 // To ensure integrity, only consider the group insertion successful when its path_hash
511 // is successfully registered to its corresponding group ID.
512 let group_id = context.group_id;
513 let inserted = context.clone();
514 let rg = GroupRegistration { context, evt_on_update };
515
516 // If the path already exists in the archive, load the corresponding node.
517 if let Some(node) = self.archive.read().find_path(rg.context.path.iter()) {
518 Self::load_node(
519 &rg.context,
520 node,
521 &EmptyMonitor,
522 #[cfg(feature = "crypt")]
523 Self::crypt_key_loader(&self.crypt_key),
524 );
525 }
526
527 // Ensure that the Group ID is unique.
528 assert!(
529 self.all_groups.write().insert(group_id, rg).is_none(),
530 "Group IDs must be unique." // Ensure we haven't exhausted all 2^64 possibilities.
531 );
532
533 // Check for path-hash collisions. In the rare case where a collision occurs due to
534 // another thread registering the same path-hash, we remove the current group
535 // registration and return an error.
536 if self.path_hashes.write().entry(path_hash).or_insert(group_id) != &group_id {
537 self.all_groups.write().remove(&group_id);
538 return Err(GroupCreationError::PathCollisionRace(inserted.path.clone()));
539 }
540
541 // Notify the monitor that a new group has been added.
542 self.monitor.write().group_added(group_id, &inserted);
543 Ok(inserted)
544 }
545
546 pub fn unregister_group(&self, group_id: GroupID, path_hash: PathHash) {
547 {
548 let mut path_hashes = self.path_hashes.write();
549 if !path_hashes.get(&path_hash).is_some_and(|v| *v == group_id) {
550 tr::debug!(?group_id, ?path_hash, "unregister_group() call to unexist group");
551 return;
552 };
553
554 path_hashes.remove(&path_hash);
555 }
556
557 if let Some(ctx) = self.all_groups.write().remove(&group_id) {
558 let _s = tr::info_span!(
559 "unregister_group()",
560 template = ?ctx.context.template_name,
561 path = ?ctx.context.path
562 );
563
564 // Consider the removal valid only if the group was previously and validly created.
565 // The method `all_groups.remove` might return `None` if the group was not
566 // successfully registered during `register_group`. If this happens, this function
567 // gets invoked during the disposal of `GroupUnregisterHook` within the
568 // `create_impl` function.
569
570 // For valid removals, add contents to the cached archive.
571 Self::dump_node(
572 &ctx.context,
573 &mut self.archive.write(),
574 #[cfg(feature = "crypt")]
575 Self::crypt_key_loader(&self.crypt_key),
576 );
577
578 // Notify about the removal
579 self.monitor.write().group_removed(group_id);
580 }
581 }
582
583 pub fn on_value_update(&self, group_id: GroupID, data: &entity::EntityData, silent: bool) {
584 // Monitor should always be notified on value update, regardless of silent flag
585 self.monitor.read().entity_value_updated(group_id, data.id);
586
587 // If silent flag is set, skip internal notify to other instances.
588 if silent {
589 return;
590 }
591
592 // This is trivially fallible operation.
593 if let Some(group) = self.all_groups.read().get(&group_id) {
594 group.context.version.fetch_add(1, Ordering::Relaxed);
595 group.evt_on_update.notify();
596 }
597 }
598
599 pub fn replace_monitor(&self, new_monitor: Box<dyn Monitor>) -> Box<dyn Monitor> {
600 // At the start of the operation, we replace the monitor. This means that before we add all
601 // valid groups to the new monitor, there may be notifications about group removals or updates.
602 // Without redesigning the entire storage system into a more expensive locking mechanism,
603 // there's no way to avoid this. We assume that monitor implementation providers will handle
604 // any incorrect updates gracefully and thus, we are ignoring this case.
605
606 let old_monitor = replace(&mut *self.monitor.write(), new_monitor);
607
608 // NOTE: Ensuring thread-safe behavior during the initialization of a new monitor:
609 // - Iteration partially locks `path_hashes` based on shards (see DashMap
610 // implementation).
611 // - This means that new group insertions or removals can occur during iteration.
612 // - However, it's guaranteed that while iterating over a shard, no other thread can
613 // modify the same shard of `path_hashes`.
614 // - Since every group insertion or removal first modifies `path_hashes`, it's safe to
615 // assume we see a consistent state of `path_hashes` during shard iteration, given
616 // we're observing the read-locked state of that shard.
617 for group_id in self.path_hashes.read().values() {
618 let all_groups = self.all_groups.read();
619 let Some(group) = all_groups.get(group_id) else {
620 unreachable!(
621 "As long as the group_id is found from path_hashes, \
622 the group must be found from `all_groups`."
623 )
624 };
625
626 // Since we call `group_added` on every group,
627 self.monitor.write().group_added(*group_id, &group.context);
628 }
629
630 old_monitor
631 }
632
633 /// ⚠️ **CAUTION!** Do NOT alter this literal! Any modification will DESTROY all existing
634 /// encrypted data irreparably! ⚠️
635 ///
636 /// PREFIX itself is valid base64, which is decorated word 'secret'.
637 #[cfg(feature = "crypt")]
638 const CRYPT_PREFIX: &'static str = "+/+sE/cRE+t//";
639
640 #[cfg(feature = "crypt")]
641 fn crypt_key_loader(
642 key: &RwLock<Option<[u8; 32]>>,
643 ) -> impl Fn() -> Option<[u8; 32]> + '_ + Copy {
644 || key.read().or_else(Self::crypt_sys_key)
645 }
646
647 /// Uses hard coded NONCE, just to run the algorithm.
648 #[cfg(feature = "crypt")]
649 const CRYPT_NONCE: [u8; 12] = [15, 43, 5, 12, 6, 66, 126, 231, 141, 18, 33, 71];
650
651 #[cfg(feature = "crypt")]
652 fn crypt_sys_key() -> Option<[u8; 32]> {
653 #[cfg(feature = "crypt-machine-id")]
654 {
655 use std::sync::OnceLock;
656 static CACHED: OnceLock<Option<[u8; 32]>> = OnceLock::new();
657
658 *CACHED.get_or_init(|| {
659 machine_uid::get().ok().map(|uid| {
660 use sha2::{Digest, Sha256};
661 let arr = &*Sha256::new().chain_update(uid).finalize();
662 std::array::from_fn(|index| arr[index])
663 })
664 })
665 }
666
667 #[cfg(not(feature = "crypt-machine-id"))]
668 {
669 None
670 }
671 }
672
673 fn dump_node(
674 ctx: &GroupContext,
675 archive: &mut archive::Archive,
676 #[cfg(feature = "crypt")] crypt_key_loader: impl Fn() -> Option<[u8; 32]>,
677 ) {
678 let _s = tr::info_span!("dump_node()", template=?ctx.template_name, path=?ctx.path);
679
680 let paths = ctx.path.iter();
681 let node = archive.find_or_create_path_mut(paths);
682
683 // Clear existing values before dumping.
684 node.values.clear();
685
686 #[cfg(feature = "crypt")]
687 let mut crypt_key = None;
688
689 '_outer: for (meta, val) in ctx
690 .sources
691 .iter()
692 .map(|e| e.property_value())
693 .filter(|(meta, _)| !meta.metadata.flags.contains(MetaFlag::NO_EXPORT))
694 {
695 let _s = tr::info_span!("node dump", varname=?meta.varname);
696 let dst = node.values.entry(meta.name.into()).or_default();
697
698 #[cfg(feature = "crypt")]
699 'encryption: {
700 use aes_gcm::aead::{Aead, KeyInit};
701 use base64::prelude::*;
702
703 if !meta.metadata.flags.contains(MetaFlag::SECRET) {
704 break 'encryption;
705 }
706
707 if crypt_key.is_none() {
708 crypt_key = Some(crypt_key_loader().ok_or(()));
709 }
710
711 // Check if key was correctly loaded. If not, skip serialization itself to not
712 // export delicate data.
713 let Ok(key) = crypt_key.as_ref().unwrap() else {
714 tr::warn!("Crypt key missing. Skipping secret data serialization.");
715 continue '_outer;
716 };
717 let Ok(json) = serde_json::to_vec(val.as_serialize()) else {
718 tr::warn!("JSON dump failed");
719 continue '_outer;
720 };
721
722 let cipher = aes_gcm::Aes256Gcm::new(key.into());
723 let Ok(enc) = cipher.encrypt(&Self::CRYPT_NONCE.into(), &json[..]) else {
724 tr::warn!("Encryption failed");
725 continue '_outer;
726 };
727
728 *dst = serde_json::Value::String(format!(
729 "{}{}",
730 Self::CRYPT_PREFIX,
731 BASE64_STANDARD_NO_PAD.encode(&enc)
732 ));
733
734 continue '_outer;
735 }
736
737 #[cfg(not(feature = "crypt"))]
738 if meta.metadata.flags.contains(MetaFlag::SECRET) {
739 tr::warn!("`crypt` Feature disabled: Skipping secret data serialization.");
740 continue;
741 }
742
743 match serde_json::to_value(val.as_serialize()) {
744 Ok(val) => *dst = val,
745 Err(error) => {
746 tr::warn!(%error, "JSON dump failed");
747 }
748 }
749 }
750 }
751
752 fn load_node(
753 ctx: &GroupContext,
754 node: &archive::Archive,
755 monitor: &dyn Monitor,
756 #[cfg(feature = "crypt")] crypt_key_loader: impl Fn() -> Option<[u8; 32]>,
757 ) -> bool {
758 let _s = tr::info_span!("load_node()", template=?ctx.template_name, path=?ctx.path);
759 let mut has_update = false;
760
761 #[cfg(feature = "crypt")]
762 let mut crypt_key = None;
763
764 '_outer: for (elem, de) in ctx
765 .sources
766 .iter()
767 .filter(|e| !e.meta.flags.contains(MetaFlag::NO_IMPORT))
768 .filter_map(|x| node.values.get(x.meta.name).map(|o| (x, o)))
769 {
770 let _s = tr::info_span!("node load", varname=?elem.meta.varname);
771
772 #[allow(unused_mut)]
773 let mut update_result = None;
774
775 #[cfg(feature = "crypt")]
776 'decryption: {
777 use aes_gcm::aead::{Aead, KeyInit};
778 use base64::prelude::*;
779
780 if !elem.meta.flags.contains(MetaFlag::SECRET) {
781 // Just try to deserialize from plain value.
782 break 'decryption;
783 }
784
785 // Non-string value is not an encrypted property. serve as-is.
786 let Some(str) = de.as_str() else { break 'decryption };
787
788 // Verify if it is encrpyted string repr.
789 if !str.starts_with(Self::CRYPT_PREFIX) {
790 tr::debug!("Non-encrypted string repr. serve as-is.");
791 break 'decryption;
792 }
793
794 let str = &str[Self::CRYPT_PREFIX.len()..];
795 let Ok(bin) = BASE64_STANDARD_NO_PAD.decode(str).map_err(|error| {
796 tr::debug!(
797 %error,
798 "Crypt-prefixed string is not valid base64. \
799 Trying to parse as plain string."
800 )
801 }) else {
802 break 'decryption;
803 };
804
805 if crypt_key.is_none() {
806 crypt_key = Some(crypt_key_loader().ok_or(()));
807 }
808
809 // Check if key was correctly loaded. If not, just try to parse string as plain
810 // string ... which woun't be very useful though.
811 let Ok(key) = crypt_key.as_ref().unwrap() else {
812 tr::warn!("Crypt key missing. Skipping secret data serialization.");
813 break 'decryption;
814 };
815
816 let cipher = aes_gcm::Aes256Gcm::new(key.into());
817 let Ok(json) =
818 cipher.decrypt(&Self::CRYPT_NONCE.into(), &bin[..]).map_err(|error| {
819 tr::warn!(%error, "Failed to decrypt secret data");
820 })
821 else {
822 break 'decryption;
823 };
824
825 update_result = Some(
826 elem.update_value_from(&mut serde_json::Deserializer::from_slice(&json)),
827 );
828 }
829
830 match update_result.unwrap_or_else(|| elem.update_value_from(de)) {
831 Ok(_) => {
832 has_update = true;
833 monitor.entity_value_updated(ctx.group_id, elem.id);
834 }
835 Err(error) => {
836 tr::warn!(%error, "Element value update error during node loading")
837 }
838 }
839 }
840
841 if has_update {
842 // On successful load, set its fence value as 1, to make the first client
843 // side's call to `update()` call would be triggered.
844 ctx.version.fetch_add(1, Ordering::Release);
845 }
846
847 has_update
848 }
849 }
850
851 /* ------------------------------------ Import Operation ------------------------------------ */
852 #[derive(Setters)]
853 #[setters(borrow_self)]
854 pub struct ImportOnDrop<'a> {
855 #[setters(skip)]
856 inner: &'a Inner,
857
858 #[setters(skip)]
859 archive: ManuallyDrop<Archive>,
860
861 /// When set to true, the imported config will be merged with the existing cache. This is typically
862 /// useful to prevent unsaved archive entities from being overwritten.
863 ///
864 /// Default is `true`.
865 merge_onto_cache: bool,
866
867 /// If this option is enabled, the imported settings will be treated as a 'patch' before being applied.
868 /// If disabled, the imported settings will directly overwrite existing ones, affecting all properties
869 /// in the archive even if the archive content hasn't actually changed.
870 ///
871 /// Default is `true`.
872 apply_as_patch: bool,
873 }
874
875 impl<'a> ImportOnDrop<'a> {
876 pub(super) fn new(inner: &'a Inner, archive: Archive) -> Self {
877 Self {
878 inner,
879 archive: ManuallyDrop::new(archive),
880 merge_onto_cache: true,
881 apply_as_patch: true,
882 }
883 }
884 }
885
886 impl<'a> Drop for ImportOnDrop<'a> {
887 fn drop(&mut self) {
888 // SAFETY: Typical `ManuallyDrop` usage.
889 let mut imported = unsafe { ManuallyDrop::take(&mut self.archive) };
890 let this = self.inner;
891
892 #[cfg(feature = "crypt")]
893 let key_loader = Inner::crypt_key_loader(&this.crypt_key);
894
895 let import_archive = |archive: &Archive| {
896 for group in this.all_groups.read().values() {
897 let path = &group.context.path;
898 let path = path.iter();
899 let Some(node) = archive.find_path(path) else { continue };
900
901 if Inner::load_node(
902 &group.context,
903 node,
904 &**this.monitor.read(),
905 #[cfg(feature = "crypt")]
906 key_loader,
907 ) {
908 group.evt_on_update.notify();
909 }
910 }
911 };
912
913 let mut self_archive = this.archive.write();
914 if self.apply_as_patch {
915 let patch = self_archive.create_patch(&mut imported);
916 import_archive(&patch);
917
918 if self.merge_onto_cache {
919 self_archive.merge_from(patch);
920 } else {
921 imported.merge_from(patch);
922 *self_archive = imported;
923 }
924 } else {
925 if self.merge_onto_cache {
926 self_archive.merge_from(imported);
927 } else {
928 *self_archive = imported;
929 }
930
931 import_archive(&self_archive);
932 }
933 }
934 }
935
936 /* ------------------------------------ Export Operation ------------------------------------ */
937 #[derive(Setters)]
938 pub struct ExportTask<'a> {
939 #[setters(skip)]
940 inner: &'a Inner,
941
942 /// On export, the storage gathers only active instances of config groups. If this is set to true,
943 /// the collected results will be merged with the existing dump cache, preserving
944 /// the archive data of uninitialized config groups.
945 ///
946 /// If set to false, only active config groups will be exported.
947 ///
948 /// Default is `true`
949 merge_onto_dumped: bool,
950
951 /// When this option is true, the storage will overwrite the import cache with the exported data.
952 /// This will influence the creation of the next config group.
953 ///
954 /// Default is `true`
955 replace_import_cache: bool,
956 }
957
958 impl<'a> ExportTask<'a> {
959 pub(super) fn new(inner: &'a Inner) -> Self {
960 Self { inner, merge_onto_dumped: true, replace_import_cache: true }
961 }
962
963 /// Performs export operation with given settings
964 pub fn collect(self) -> Archive {
965 let mut archive = Archive::default();
966 let this = self.inner;
967
968 #[cfg(feature = "crypt")]
969 let key_loader = Inner::crypt_key_loader(&this.crypt_key);
970
971 for group in this.all_groups.read().values() {
972 Inner::dump_node(
973 &group.context,
974 &mut archive,
975 #[cfg(feature = "crypt")]
976 key_loader,
977 );
978 }
979
980 let mut self_archive = this.archive.write();
981 if !self.merge_onto_dumped {
982 if self.replace_import_cache {
983 *self_archive = archive;
984 self_archive.clone()
985 } else {
986 archive
987 }
988 } else if self.replace_import_cache {
989 self_archive.merge_from(archive);
990 self_archive.clone()
991 } else {
992 archive.merge(self_archive.clone())
993 }
994 }
995 }
996}
997
998#[cfg(feature = "arc-swap")]
999pub mod atomic {
1000 use arc_swap::ArcSwap;
1001
1002 use super::Storage;
1003
1004 pub struct AtomicStorageArc(ArcSwap<super::inner::Inner>);
1005
1006 impl From<Storage> for AtomicStorageArc {
1007 fn from(value: Storage) -> Self {
1008 Self(ArcSwap::new(value.0))
1009 }
1010 }
1011
1012 impl Default for AtomicStorageArc {
1013 fn default() -> Self {
1014 Self::from(Storage::default())
1015 }
1016 }
1017
1018 impl AtomicStorageArc {
1019 pub fn load(&self) -> Storage {
1020 Storage(self.0.load_full())
1021 }
1022
1023 pub fn store(&self, storage: Storage) {
1024 self.0.store(storage.0)
1025 }
1026 }
1027}