Skip to main content

hyperi_rustlib/config/
shared.rs

1// Project:   hyperi-rustlib
2// File:      src/config/shared.rs
3// Purpose:   Thread-safe shared configuration with hot-reload support
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Generic thread-safe shared configuration with version tracking.
10//!
11//! `SharedConfig<T>` wraps any config struct in an `Arc<RwLock<T>>` with a
12//! monotonic version counter and a tokio watch channel for subscriber
13//! notifications. This is the universal building block for hot-reload across
14//! all DFE components (loader, receiver, archiver).
15//!
16//! ## Usage
17//!
18//! ```rust
19//! use hyperi_rustlib::config::shared::SharedConfig;
20//!
21//! #[derive(Clone, Debug, Default)]
22//! struct AppConfig {
23//!     pub buffer_size: usize,
24//!     pub log_level: String,
25//! }
26//!
27//! // Create shared config
28//! let shared = SharedConfig::new(AppConfig {
29//!     buffer_size: 1024,
30//!     log_level: "info".into(),
31//! });
32//!
33//! // Read config (zero-copy via read guard)
34//! {
35//!     let cfg = shared.read();
36//!     assert_eq!(cfg.buffer_size, 1024);
37//! }
38//!
39//! // Subscribe to changes
40//! let mut rx = shared.subscribe();
41//!
42//! // Update config (notifies all subscribers)
43//! let mut new_cfg = shared.get();
44//! new_cfg.buffer_size = 2048;
45//! shared.update(new_cfg);
46//!
47//! assert_eq!(shared.version(), 1);
48//! assert_eq!(*rx.borrow(), 1);
49//! ```
50//!
51//! ## Migration from Component-Specific Implementations
52//!
53//! All DFE components previously had their own `SharedConfig` hard-coded to
54//! their specific `Config` struct. This generic version is a drop-in
55//! replacement:
56//!
57//! ```text
58//! // Before (component-specific):
59//! use crate::config::SharedConfig;           // hard-coded to crate::Config
60//!
61//! // After (generic from rustlib):
62//! use hyperi_rustlib::config::SharedConfig;   // SharedConfig<Config>
63//! let shared = SharedConfig::new(config);     // type inferred from argument
64//! ```
65//!
66//! ### API Compatibility
67//!
68//! | Component Method | rustlib Equivalent | Notes |
69//! |------------------|--------------------|-------|
70//! | `read()` | `read()` | Returns `RwLockReadGuard` |
71//! | `get()` | `get()` | Clones current config |
72//! | `with(f)` | `with(f)` | Closure-based read |
73//! | `update(cfg)` | `update(cfg)` | Write + version bump + notify |
74//! | `version()` | `version()` | Atomic version counter |
75//! | `subscribe()` | `subscribe()` | `watch::Receiver<u64>` |
76//! | `clone_inner()` | Removed | Use `Clone` on `SharedConfig` instead |
77
78use std::sync::Arc;
79use std::sync::atomic::{AtomicU64, Ordering};
80
81use parking_lot::RwLock;
82use tokio::sync::watch;
83use tracing::debug;
84
85/// Thread-safe shared configuration with version tracking and change
86/// notification.
87///
88/// Designed for hot-reload: components subscribe to config changes via the
89/// watch channel and react accordingly. The version counter provides a cheap
90/// way to detect whether config has changed since last check.
91///
92/// `T` must be `Clone` (for `get()`), `Send + Sync` (for cross-thread access),
93/// and `'static` (for use in async tasks).
94pub struct SharedConfig<T> {
95    inner: Arc<RwLock<T>>,
96    version: Arc<AtomicU64>,
97    watch_tx: Arc<watch::Sender<u64>>,
98    watch_rx: watch::Receiver<u64>,
99}
100
101impl<T: Clone + Send + Sync + 'static> SharedConfig<T> {
102    /// Create a new shared config from an initial value.
103    ///
104    /// Version starts at 0. First `update()` bumps to 1.
105    #[must_use]
106    pub fn new(config: T) -> Self {
107        let (watch_tx, watch_rx) = watch::channel(0);
108
109        Self {
110            inner: Arc::new(RwLock::new(config)),
111            version: Arc::new(AtomicU64::new(0)),
112            watch_tx: Arc::new(watch_tx),
113            watch_rx,
114        }
115    }
116
117    /// Read the current configuration (zero-copy).
118    ///
119    /// Returns a read guard that holds the lock. Multiple readers can hold
120    /// the lock simultaneously. Prefer this over `get()` when you don't need
121    /// to hold the config across await points.
122    #[inline]
123    pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, T> {
124        self.inner.read()
125    }
126
127    /// Get a clone of the current configuration.
128    ///
129    /// Use when you need to hold the config across await points or pass it
130    /// to other functions. For read-only access in synchronous code, prefer
131    /// `read()` or `with()`.
132    #[must_use]
133    pub fn get(&self) -> T {
134        self.inner.read().clone()
135    }
136
137    /// Access configuration via a closure (avoids cloning).
138    ///
139    /// The closure receives a reference to the current config while holding
140    /// the read lock. Useful for extracting a few fields without cloning the
141    /// entire struct.
142    pub fn with<F, R>(&self, f: F) -> R
143    where
144        F: FnOnce(&T) -> R,
145    {
146        let guard = self.inner.read();
147        f(&guard)
148    }
149
150    /// Get the current config version.
151    ///
152    /// Version is 0 after creation, incremented by 1 on each `update()`.
153    /// Monotonically increasing -- never decreases.
154    #[inline]
155    #[must_use]
156    pub fn version(&self) -> u64 {
157        self.version.load(Ordering::Acquire)
158    }
159
160    /// Update the configuration atomically.
161    ///
162    /// This will:
163    /// 1. Acquire write lock and replace the config
164    /// 2. Increment the version counter
165    /// 3. Notify all subscribers via the watch channel
166    pub fn update(&self, new_config: T) {
167        {
168            let mut guard = self.inner.write();
169            *guard = new_config;
170        }
171
172        let new_version = self.version.fetch_add(1, Ordering::AcqRel) + 1;
173
174        // Notify subscribers (ignore error if no receivers)
175        let _ = self.watch_tx.send(new_version);
176
177        debug!(version = new_version, "Configuration updated");
178    }
179
180    /// Subscribe to configuration changes.
181    ///
182    /// Returns a `watch::Receiver<u64>` that yields the new version number
183    /// on each config update. Use `rx.changed().await` to wait for the next
184    /// change.
185    ///
186    /// ```ignore
187    /// let mut rx = shared.subscribe();
188    /// loop {
189    ///     rx.changed().await.unwrap();
190    ///     let version = *rx.borrow();
191    ///     let config = shared.get();
192    ///     // React to config change...
193    /// }
194    /// ```
195    #[must_use]
196    pub fn subscribe(&self) -> watch::Receiver<u64> {
197        self.watch_rx.clone()
198    }
199}
200
201impl<T: Clone + Send + Sync + 'static> Clone for SharedConfig<T> {
202    fn clone(&self) -> Self {
203        Self {
204            inner: self.inner.clone(),
205            version: self.version.clone(),
206            watch_tx: self.watch_tx.clone(),
207            watch_rx: self.watch_rx.clone(),
208        }
209    }
210}
211
212impl<T: Clone + Send + Sync + Default + 'static> Default for SharedConfig<T> {
213    fn default() -> Self {
214        Self::new(T::default())
215    }
216}
217
218impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for SharedConfig<T> {
219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220        f.debug_struct("SharedConfig")
221            .field("version", &self.version())
222            .finish()
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[derive(Clone, Debug, Default, PartialEq)]
231    struct TestConfig {
232        pub name: String,
233        pub value: u64,
234    }
235
236    #[test]
237    fn test_new_starts_at_version_zero() {
238        let shared = SharedConfig::new(TestConfig::default());
239        assert_eq!(shared.version(), 0);
240    }
241
242    #[test]
243    fn test_read_returns_initial_value() {
244        let cfg = TestConfig {
245            name: "test".into(),
246            value: 42,
247        };
248        let shared = SharedConfig::new(cfg.clone());
249
250        let guard = shared.read();
251        assert_eq!(guard.name, "test");
252        assert_eq!(guard.value, 42);
253    }
254
255    #[test]
256    fn test_get_clones_current_config() {
257        let cfg = TestConfig {
258            name: "original".into(),
259            value: 1,
260        };
261        let shared = SharedConfig::new(cfg);
262
263        let got = shared.get();
264        assert_eq!(got.name, "original");
265    }
266
267    #[test]
268    fn test_with_closure_access() {
269        let cfg = TestConfig {
270            name: "closure".into(),
271            value: 99,
272        };
273        let shared = SharedConfig::new(cfg);
274
275        let val = shared.with(|c| c.value);
276        assert_eq!(val, 99);
277    }
278
279    #[test]
280    fn test_update_increments_version() {
281        let shared = SharedConfig::new(TestConfig::default());
282
283        assert_eq!(shared.version(), 0);
284
285        shared.update(TestConfig {
286            name: "v1".into(),
287            value: 1,
288        });
289        assert_eq!(shared.version(), 1);
290
291        shared.update(TestConfig {
292            name: "v2".into(),
293            value: 2,
294        });
295        assert_eq!(shared.version(), 2);
296    }
297
298    #[test]
299    fn test_update_changes_visible() {
300        let shared = SharedConfig::new(TestConfig::default());
301
302        shared.update(TestConfig {
303            name: "updated".into(),
304            value: 100,
305        });
306
307        assert_eq!(shared.read().name, "updated");
308        assert_eq!(shared.read().value, 100);
309    }
310
311    #[tokio::test]
312    async fn test_subscribe_receives_notification() {
313        let shared = SharedConfig::new(TestConfig::default());
314        let mut rx = shared.subscribe();
315
316        assert_eq!(*rx.borrow(), 0);
317
318        shared.update(TestConfig {
319            name: "notify".into(),
320            value: 1,
321        });
322
323        rx.changed().await.expect("should receive change");
324        assert_eq!(*rx.borrow(), 1);
325    }
326
327    #[tokio::test]
328    async fn test_multiple_subscribers_all_notified() {
329        let shared = SharedConfig::new(TestConfig::default());
330
331        let mut rx1 = shared.subscribe();
332        let mut rx2 = shared.subscribe();
333        let mut rx3 = shared.subscribe();
334
335        shared.update(TestConfig {
336            name: "multi".into(),
337            value: 1,
338        });
339
340        rx1.changed().await.expect("subscriber 1");
341        rx2.changed().await.expect("subscriber 2");
342        rx3.changed().await.expect("subscriber 3");
343
344        assert_eq!(*rx1.borrow(), 1);
345        assert_eq!(*rx2.borrow(), 1);
346        assert_eq!(*rx3.borrow(), 1);
347    }
348
349    #[test]
350    fn test_clone_shares_state() {
351        let shared = SharedConfig::new(TestConfig::default());
352        let cloned = shared.clone();
353
354        shared.update(TestConfig {
355            name: "from-original".into(),
356            value: 1,
357        });
358
359        // Clone sees the update
360        assert_eq!(cloned.read().name, "from-original");
361        assert_eq!(cloned.version(), 1);
362
363        // Update from clone is visible on original
364        cloned.update(TestConfig {
365            name: "from-clone".into(),
366            value: 2,
367        });
368
369        assert_eq!(shared.read().name, "from-clone");
370        assert_eq!(shared.version(), 2);
371    }
372
373    #[test]
374    fn test_default() {
375        let shared: SharedConfig<TestConfig> = SharedConfig::default();
376        assert_eq!(shared.version(), 0);
377        assert_eq!(shared.read().name, "");
378    }
379
380    #[tokio::test]
381    async fn test_concurrent_read_during_update() {
382        let shared = SharedConfig::new(TestConfig {
383            name: "initial".into(),
384            value: 0,
385        });
386
387        let shared_clone = shared.clone();
388
389        // Spawn a reader
390        let reader = tokio::spawn(async move {
391            let mut values = Vec::new();
392            for _ in 0..100 {
393                let name = shared_clone.with(|c| c.name.clone());
394                values.push(name);
395                tokio::task::yield_now().await;
396            }
397            values
398        });
399
400        // Concurrently update
401        for i in 0..50 {
402            shared.update(TestConfig {
403                name: if i % 2 == 0 {
404                    "even".into()
405                } else {
406                    "odd".into()
407                },
408                value: i,
409            });
410            tokio::task::yield_now().await;
411        }
412
413        // Reader should never panic, all values should be valid
414        let values = reader.await.expect("reader task should not panic");
415        for v in &values {
416            assert!(
417                v == "initial" || v == "even" || v == "odd",
418                "unexpected value: {v}"
419            );
420        }
421    }
422}