Skip to main content

hyperi_rustlib/config/
shared.rs

1// Project:   hyperi-rustlib
2// File:      src/config/shared.rs
3// Purpose:   Thread-safe shared configuration with hot-reload support
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Generic thread-safe shared configuration with version tracking.
10//!
11//! `SharedConfig<T>` wraps any config struct in an `Arc<RwLock<T>>` with a
12//! monotonic version counter and a tokio watch channel for subscriber
13//! notifications. This is the universal building block for hot-reload across
14//! all DFE components (loader, receiver, archiver).
15//!
16//! ## Usage
17//!
18//! ```rust
19//! use hyperi_rustlib::config::shared::SharedConfig;
20//!
21//! #[derive(Clone, Debug, Default)]
22//! struct AppConfig {
23//!     pub buffer_size: usize,
24//!     pub log_level: String,
25//! }
26//!
27//! // Create shared config
28//! let shared = SharedConfig::new(AppConfig {
29//!     buffer_size: 1024,
30//!     log_level: "info".into(),
31//! });
32//!
33//! // Read config (zero-copy via read guard)
34//! {
35//!     let cfg = shared.read();
36//!     assert_eq!(cfg.buffer_size, 1024);
37//! }
38//!
39//! // Subscribe to changes
40//! let mut rx = shared.subscribe();
41//!
42//! // Update config (notifies all subscribers)
43//! let mut new_cfg = shared.get();
44//! new_cfg.buffer_size = 2048;
45//! shared.update(new_cfg);
46//!
47//! assert_eq!(shared.version(), 1);
48//! assert_eq!(*rx.borrow(), 1);
49//! ```
50//!
51//! Drop-in replacement for the per-component `SharedConfig` structs DFE
52//! components used to hard-code to their own `Config`. Same method names
53//! (`read`/`get`/`with`/`update`/`version`/`subscribe`); type is inferred
54//! from the argument. The old `clone_inner()` is gone -- clone the
55//! `SharedConfig` itself.
56
57use std::sync::Arc;
58use std::sync::atomic::{AtomicU64, Ordering};
59
60use parking_lot::RwLock;
61use tokio::sync::watch;
62use tracing::debug;
63
64/// Thread-safe shared configuration with version tracking and change
65/// notification.
66///
67/// Designed for hot-reload: components subscribe to config changes via the
68/// watch channel and react accordingly. The version counter provides a cheap
69/// way to detect whether config has changed since last check.
70///
71/// `T` must be `Clone` (for `get()`), `Send + Sync` (for cross-thread access),
72/// and `'static` (for use in async tasks).
73pub struct SharedConfig<T> {
74    inner: Arc<RwLock<T>>,
75    version: Arc<AtomicU64>,
76    watch_tx: Arc<watch::Sender<u64>>,
77    watch_rx: watch::Receiver<u64>,
78}
79
80impl<T: Clone + Send + Sync + 'static> SharedConfig<T> {
81    /// Create a new shared config from an initial value.
82    ///
83    /// Version starts at 0. First `update()` bumps to 1.
84    #[must_use]
85    pub fn new(config: T) -> Self {
86        let (watch_tx, watch_rx) = watch::channel(0);
87
88        Self {
89            inner: Arc::new(RwLock::new(config)),
90            version: Arc::new(AtomicU64::new(0)),
91            watch_tx: Arc::new(watch_tx),
92            watch_rx,
93        }
94    }
95
96    /// Read the current configuration (zero-copy).
97    ///
98    /// Returns a read guard that holds the lock. Multiple readers can hold
99    /// the lock simultaneously. Prefer this over `get()` when you don't need
100    /// to hold the config across await points.
101    #[inline]
102    pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, T> {
103        self.inner.read()
104    }
105
106    /// Get a clone of the current configuration.
107    ///
108    /// Use when you need to hold the config across await points or pass it
109    /// to other functions. For read-only access in synchronous code, prefer
110    /// `read()` or `with()`.
111    #[must_use]
112    pub fn get(&self) -> T {
113        self.inner.read().clone()
114    }
115
116    /// Access configuration via a closure (avoids cloning).
117    ///
118    /// The closure receives a reference to the current config while holding
119    /// the read lock. Useful for extracting a few fields without cloning the
120    /// entire struct.
121    pub fn with<F, R>(&self, f: F) -> R
122    where
123        F: FnOnce(&T) -> R,
124    {
125        let guard = self.inner.read();
126        f(&guard)
127    }
128
129    /// Get the current config version.
130    ///
131    /// Version is 0 after creation, incremented by 1 on each `update()`.
132    /// Monotonically increasing -- never decreases.
133    #[inline]
134    #[must_use]
135    pub fn version(&self) -> u64 {
136        self.version.load(Ordering::Acquire)
137    }
138
139    /// Update the configuration atomically.
140    ///
141    /// This will:
142    /// 1. Acquire write lock and replace the config
143    /// 2. Increment the version counter
144    /// 3. Notify all subscribers via the watch channel
145    pub fn update(&self, new_config: T) {
146        {
147            let mut guard = self.inner.write();
148            *guard = new_config;
149        }
150
151        let new_version = self.version.fetch_add(1, Ordering::AcqRel) + 1;
152
153        // Notify subscribers (ignore error if no receivers)
154        let _ = self.watch_tx.send(new_version);
155
156        debug!(version = new_version, "Configuration updated");
157    }
158
159    /// Subscribe to configuration changes.
160    ///
161    /// Returns a `watch::Receiver<u64>` that yields the new version number
162    /// on each config update. Use `rx.changed().await` to wait for the next
163    /// change.
164    ///
165    /// ```ignore
166    /// let mut rx = shared.subscribe();
167    /// loop {
168    ///     rx.changed().await.unwrap();
169    ///     let version = *rx.borrow();
170    ///     let config = shared.get();
171    ///     // React to config change...
172    /// }
173    /// ```
174    #[must_use]
175    pub fn subscribe(&self) -> watch::Receiver<u64> {
176        self.watch_rx.clone()
177    }
178}
179
180impl<T: Clone + Send + Sync + 'static> Clone for SharedConfig<T> {
181    fn clone(&self) -> Self {
182        Self {
183            inner: self.inner.clone(),
184            version: self.version.clone(),
185            watch_tx: self.watch_tx.clone(),
186            watch_rx: self.watch_rx.clone(),
187        }
188    }
189}
190
191impl<T: Clone + Send + Sync + Default + 'static> Default for SharedConfig<T> {
192    fn default() -> Self {
193        Self::new(T::default())
194    }
195}
196
197impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for SharedConfig<T> {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        f.debug_struct("SharedConfig")
200            .field("version", &self.version())
201            .finish()
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[derive(Clone, Debug, Default, PartialEq)]
210    struct TestConfig {
211        pub name: String,
212        pub value: u64,
213    }
214
215    #[test]
216    fn test_new_starts_at_version_zero() {
217        let shared = SharedConfig::new(TestConfig::default());
218        assert_eq!(shared.version(), 0);
219    }
220
221    #[test]
222    fn test_read_returns_initial_value() {
223        let cfg = TestConfig {
224            name: "test".into(),
225            value: 42,
226        };
227        let shared = SharedConfig::new(cfg.clone());
228
229        let guard = shared.read();
230        assert_eq!(guard.name, "test");
231        assert_eq!(guard.value, 42);
232    }
233
234    #[test]
235    fn test_get_clones_current_config() {
236        let cfg = TestConfig {
237            name: "original".into(),
238            value: 1,
239        };
240        let shared = SharedConfig::new(cfg);
241
242        let got = shared.get();
243        assert_eq!(got.name, "original");
244    }
245
246    #[test]
247    fn test_with_closure_access() {
248        let cfg = TestConfig {
249            name: "closure".into(),
250            value: 99,
251        };
252        let shared = SharedConfig::new(cfg);
253
254        let val = shared.with(|c| c.value);
255        assert_eq!(val, 99);
256    }
257
258    #[test]
259    fn test_update_increments_version() {
260        let shared = SharedConfig::new(TestConfig::default());
261
262        assert_eq!(shared.version(), 0);
263
264        shared.update(TestConfig {
265            name: "v1".into(),
266            value: 1,
267        });
268        assert_eq!(shared.version(), 1);
269
270        shared.update(TestConfig {
271            name: "v2".into(),
272            value: 2,
273        });
274        assert_eq!(shared.version(), 2);
275    }
276
277    #[test]
278    fn test_update_changes_visible() {
279        let shared = SharedConfig::new(TestConfig::default());
280
281        shared.update(TestConfig {
282            name: "updated".into(),
283            value: 100,
284        });
285
286        assert_eq!(shared.read().name, "updated");
287        assert_eq!(shared.read().value, 100);
288    }
289
290    #[tokio::test]
291    async fn test_subscribe_receives_notification() {
292        let shared = SharedConfig::new(TestConfig::default());
293        let mut rx = shared.subscribe();
294
295        assert_eq!(*rx.borrow(), 0);
296
297        shared.update(TestConfig {
298            name: "notify".into(),
299            value: 1,
300        });
301
302        rx.changed().await.expect("should receive change");
303        assert_eq!(*rx.borrow(), 1);
304    }
305
306    #[tokio::test]
307    async fn test_multiple_subscribers_all_notified() {
308        let shared = SharedConfig::new(TestConfig::default());
309
310        let mut rx1 = shared.subscribe();
311        let mut rx2 = shared.subscribe();
312        let mut rx3 = shared.subscribe();
313
314        shared.update(TestConfig {
315            name: "multi".into(),
316            value: 1,
317        });
318
319        rx1.changed().await.expect("subscriber 1");
320        rx2.changed().await.expect("subscriber 2");
321        rx3.changed().await.expect("subscriber 3");
322
323        assert_eq!(*rx1.borrow(), 1);
324        assert_eq!(*rx2.borrow(), 1);
325        assert_eq!(*rx3.borrow(), 1);
326    }
327
328    #[test]
329    fn test_clone_shares_state() {
330        let shared = SharedConfig::new(TestConfig::default());
331        let cloned = shared.clone();
332
333        shared.update(TestConfig {
334            name: "from-original".into(),
335            value: 1,
336        });
337
338        // Clone sees the update
339        assert_eq!(cloned.read().name, "from-original");
340        assert_eq!(cloned.version(), 1);
341
342        // Update from clone is visible on original
343        cloned.update(TestConfig {
344            name: "from-clone".into(),
345            value: 2,
346        });
347
348        assert_eq!(shared.read().name, "from-clone");
349        assert_eq!(shared.version(), 2);
350    }
351
352    #[test]
353    fn test_default() {
354        let shared: SharedConfig<TestConfig> = SharedConfig::default();
355        assert_eq!(shared.version(), 0);
356        assert_eq!(shared.read().name, "");
357    }
358
359    #[tokio::test]
360    async fn test_concurrent_read_during_update() {
361        let shared = SharedConfig::new(TestConfig {
362            name: "initial".into(),
363            value: 0,
364        });
365
366        let shared_clone = shared.clone();
367
368        // Spawn a reader
369        let reader = tokio::spawn(async move {
370            let mut values = Vec::new();
371            for _ in 0..100 {
372                let name = shared_clone.with(|c| c.name.clone());
373                values.push(name);
374                tokio::task::yield_now().await;
375            }
376            values
377        });
378
379        // Concurrently update
380        for i in 0..50 {
381            shared.update(TestConfig {
382                name: if i % 2 == 0 {
383                    "even".into()
384                } else {
385                    "odd".into()
386                },
387                value: i,
388            });
389            tokio::task::yield_now().await;
390        }
391
392        // Reader should never panic, all values should be valid
393        let values = reader.await.expect("reader task should not panic");
394        for v in &values {
395            assert!(
396                v == "initial" || v == "even" || v == "odd",
397                "unexpected value: {v}"
398            );
399        }
400    }
401}