hyperi_rustlib/config/shared.rs
1// Project: hyperi-rustlib
2// File: src/config/shared.rs
3// Purpose: Thread-safe shared configuration with hot-reload support
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Generic thread-safe shared configuration with version tracking.
10//!
11//! `SharedConfig<T>` wraps any config struct in an `Arc<RwLock<T>>` with a
12//! monotonic version counter and a tokio watch channel for subscriber
13//! notifications. This is the universal building block for hot-reload across
14//! all DFE components (loader, receiver, archiver).
15//!
16//! ## Usage
17//!
18//! ```rust
19//! use hyperi_rustlib::config::shared::SharedConfig;
20//!
21//! #[derive(Clone, Debug, Default)]
22//! struct AppConfig {
23//! pub buffer_size: usize,
24//! pub log_level: String,
25//! }
26//!
27//! // Create shared config
28//! let shared = SharedConfig::new(AppConfig {
29//! buffer_size: 1024,
30//! log_level: "info".into(),
31//! });
32//!
33//! // Read config (zero-copy via read guard)
34//! {
35//! let cfg = shared.read();
36//! assert_eq!(cfg.buffer_size, 1024);
37//! }
38//!
39//! // Subscribe to changes
40//! let mut rx = shared.subscribe();
41//!
42//! // Update config (notifies all subscribers)
43//! let mut new_cfg = shared.get();
44//! new_cfg.buffer_size = 2048;
45//! shared.update(new_cfg);
46//!
47//! assert_eq!(shared.version(), 1);
48//! assert_eq!(*rx.borrow(), 1);
49//! ```
50//!
51//! ## Migration from Component-Specific Implementations
52//!
53//! All DFE components previously had their own `SharedConfig` hard-coded to
54//! their specific `Config` struct. This generic version is a drop-in
55//! replacement:
56//!
57//! ```text
58//! // Before (component-specific):
59//! use crate::config::SharedConfig; // hard-coded to crate::Config
60//!
61//! // After (generic from rustlib):
62//! use hyperi_rustlib::config::SharedConfig; // SharedConfig<Config>
63//! let shared = SharedConfig::new(config); // type inferred from argument
64//! ```
65//!
66//! ### API Compatibility
67//!
68//! | Component Method | rustlib Equivalent | Notes |
69//! |------------------|--------------------|-------|
70//! | `read()` | `read()` | Returns `RwLockReadGuard` |
71//! | `get()` | `get()` | Clones current config |
72//! | `with(f)` | `with(f)` | Closure-based read |
73//! | `update(cfg)` | `update(cfg)` | Write + version bump + notify |
74//! | `version()` | `version()` | Atomic version counter |
75//! | `subscribe()` | `subscribe()` | `watch::Receiver<u64>` |
76//! | `clone_inner()` | Removed | Use `Clone` on `SharedConfig` instead |
77
78use std::sync::Arc;
79use std::sync::atomic::{AtomicU64, Ordering};
80
81use parking_lot::RwLock;
82use tokio::sync::watch;
83use tracing::debug;
84
85/// Thread-safe shared configuration with version tracking and change
86/// notification.
87///
88/// Designed for hot-reload: components subscribe to config changes via the
89/// watch channel and react accordingly. The version counter provides a cheap
90/// way to detect whether config has changed since last check.
91///
92/// `T` must be `Clone` (for `get()`), `Send + Sync` (for cross-thread access),
93/// and `'static` (for use in async tasks).
94pub struct SharedConfig<T> {
95 inner: Arc<RwLock<T>>,
96 version: Arc<AtomicU64>,
97 watch_tx: Arc<watch::Sender<u64>>,
98 watch_rx: watch::Receiver<u64>,
99}
100
101impl<T: Clone + Send + Sync + 'static> SharedConfig<T> {
102 /// Create a new shared config from an initial value.
103 ///
104 /// Version starts at 0. First `update()` bumps to 1.
105 #[must_use]
106 pub fn new(config: T) -> Self {
107 let (watch_tx, watch_rx) = watch::channel(0);
108
109 Self {
110 inner: Arc::new(RwLock::new(config)),
111 version: Arc::new(AtomicU64::new(0)),
112 watch_tx: Arc::new(watch_tx),
113 watch_rx,
114 }
115 }
116
117 /// Read the current configuration (zero-copy).
118 ///
119 /// Returns a read guard that holds the lock. Multiple readers can hold
120 /// the lock simultaneously. Prefer this over `get()` when you don't need
121 /// to hold the config across await points.
122 #[inline]
123 pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, T> {
124 self.inner.read()
125 }
126
127 /// Get a clone of the current configuration.
128 ///
129 /// Use when you need to hold the config across await points or pass it
130 /// to other functions. For read-only access in synchronous code, prefer
131 /// `read()` or `with()`.
132 #[must_use]
133 pub fn get(&self) -> T {
134 self.inner.read().clone()
135 }
136
137 /// Access configuration via a closure (avoids cloning).
138 ///
139 /// The closure receives a reference to the current config while holding
140 /// the read lock. Useful for extracting a few fields without cloning the
141 /// entire struct.
142 pub fn with<F, R>(&self, f: F) -> R
143 where
144 F: FnOnce(&T) -> R,
145 {
146 let guard = self.inner.read();
147 f(&guard)
148 }
149
150 /// Get the current config version.
151 ///
152 /// Version is 0 after creation, incremented by 1 on each `update()`.
153 /// Monotonically increasing -- never decreases.
154 #[inline]
155 #[must_use]
156 pub fn version(&self) -> u64 {
157 self.version.load(Ordering::Acquire)
158 }
159
160 /// Update the configuration atomically.
161 ///
162 /// This will:
163 /// 1. Acquire write lock and replace the config
164 /// 2. Increment the version counter
165 /// 3. Notify all subscribers via the watch channel
166 pub fn update(&self, new_config: T) {
167 {
168 let mut guard = self.inner.write();
169 *guard = new_config;
170 }
171
172 let new_version = self.version.fetch_add(1, Ordering::AcqRel) + 1;
173
174 // Notify subscribers (ignore error if no receivers)
175 let _ = self.watch_tx.send(new_version);
176
177 debug!(version = new_version, "Configuration updated");
178 }
179
180 /// Subscribe to configuration changes.
181 ///
182 /// Returns a `watch::Receiver<u64>` that yields the new version number
183 /// on each config update. Use `rx.changed().await` to wait for the next
184 /// change.
185 ///
186 /// ```ignore
187 /// let mut rx = shared.subscribe();
188 /// loop {
189 /// rx.changed().await.unwrap();
190 /// let version = *rx.borrow();
191 /// let config = shared.get();
192 /// // React to config change...
193 /// }
194 /// ```
195 #[must_use]
196 pub fn subscribe(&self) -> watch::Receiver<u64> {
197 self.watch_rx.clone()
198 }
199}
200
201impl<T: Clone + Send + Sync + 'static> Clone for SharedConfig<T> {
202 fn clone(&self) -> Self {
203 Self {
204 inner: self.inner.clone(),
205 version: self.version.clone(),
206 watch_tx: self.watch_tx.clone(),
207 watch_rx: self.watch_rx.clone(),
208 }
209 }
210}
211
212impl<T: Clone + Send + Sync + Default + 'static> Default for SharedConfig<T> {
213 fn default() -> Self {
214 Self::new(T::default())
215 }
216}
217
218impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for SharedConfig<T> {
219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 f.debug_struct("SharedConfig")
221 .field("version", &self.version())
222 .finish()
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[derive(Clone, Debug, Default, PartialEq)]
231 struct TestConfig {
232 pub name: String,
233 pub value: u64,
234 }
235
236 #[test]
237 fn test_new_starts_at_version_zero() {
238 let shared = SharedConfig::new(TestConfig::default());
239 assert_eq!(shared.version(), 0);
240 }
241
242 #[test]
243 fn test_read_returns_initial_value() {
244 let cfg = TestConfig {
245 name: "test".into(),
246 value: 42,
247 };
248 let shared = SharedConfig::new(cfg.clone());
249
250 let guard = shared.read();
251 assert_eq!(guard.name, "test");
252 assert_eq!(guard.value, 42);
253 }
254
255 #[test]
256 fn test_get_clones_current_config() {
257 let cfg = TestConfig {
258 name: "original".into(),
259 value: 1,
260 };
261 let shared = SharedConfig::new(cfg);
262
263 let got = shared.get();
264 assert_eq!(got.name, "original");
265 }
266
267 #[test]
268 fn test_with_closure_access() {
269 let cfg = TestConfig {
270 name: "closure".into(),
271 value: 99,
272 };
273 let shared = SharedConfig::new(cfg);
274
275 let val = shared.with(|c| c.value);
276 assert_eq!(val, 99);
277 }
278
279 #[test]
280 fn test_update_increments_version() {
281 let shared = SharedConfig::new(TestConfig::default());
282
283 assert_eq!(shared.version(), 0);
284
285 shared.update(TestConfig {
286 name: "v1".into(),
287 value: 1,
288 });
289 assert_eq!(shared.version(), 1);
290
291 shared.update(TestConfig {
292 name: "v2".into(),
293 value: 2,
294 });
295 assert_eq!(shared.version(), 2);
296 }
297
298 #[test]
299 fn test_update_changes_visible() {
300 let shared = SharedConfig::new(TestConfig::default());
301
302 shared.update(TestConfig {
303 name: "updated".into(),
304 value: 100,
305 });
306
307 assert_eq!(shared.read().name, "updated");
308 assert_eq!(shared.read().value, 100);
309 }
310
311 #[tokio::test]
312 async fn test_subscribe_receives_notification() {
313 let shared = SharedConfig::new(TestConfig::default());
314 let mut rx = shared.subscribe();
315
316 assert_eq!(*rx.borrow(), 0);
317
318 shared.update(TestConfig {
319 name: "notify".into(),
320 value: 1,
321 });
322
323 rx.changed().await.expect("should receive change");
324 assert_eq!(*rx.borrow(), 1);
325 }
326
327 #[tokio::test]
328 async fn test_multiple_subscribers_all_notified() {
329 let shared = SharedConfig::new(TestConfig::default());
330
331 let mut rx1 = shared.subscribe();
332 let mut rx2 = shared.subscribe();
333 let mut rx3 = shared.subscribe();
334
335 shared.update(TestConfig {
336 name: "multi".into(),
337 value: 1,
338 });
339
340 rx1.changed().await.expect("subscriber 1");
341 rx2.changed().await.expect("subscriber 2");
342 rx3.changed().await.expect("subscriber 3");
343
344 assert_eq!(*rx1.borrow(), 1);
345 assert_eq!(*rx2.borrow(), 1);
346 assert_eq!(*rx3.borrow(), 1);
347 }
348
349 #[test]
350 fn test_clone_shares_state() {
351 let shared = SharedConfig::new(TestConfig::default());
352 let cloned = shared.clone();
353
354 shared.update(TestConfig {
355 name: "from-original".into(),
356 value: 1,
357 });
358
359 // Clone sees the update
360 assert_eq!(cloned.read().name, "from-original");
361 assert_eq!(cloned.version(), 1);
362
363 // Update from clone is visible on original
364 cloned.update(TestConfig {
365 name: "from-clone".into(),
366 value: 2,
367 });
368
369 assert_eq!(shared.read().name, "from-clone");
370 assert_eq!(shared.version(), 2);
371 }
372
373 #[test]
374 fn test_default() {
375 let shared: SharedConfig<TestConfig> = SharedConfig::default();
376 assert_eq!(shared.version(), 0);
377 assert_eq!(shared.read().name, "");
378 }
379
380 #[tokio::test]
381 async fn test_concurrent_read_during_update() {
382 let shared = SharedConfig::new(TestConfig {
383 name: "initial".into(),
384 value: 0,
385 });
386
387 let shared_clone = shared.clone();
388
389 // Spawn a reader
390 let reader = tokio::spawn(async move {
391 let mut values = Vec::new();
392 for _ in 0..100 {
393 let name = shared_clone.with(|c| c.name.clone());
394 values.push(name);
395 tokio::task::yield_now().await;
396 }
397 values
398 });
399
400 // Concurrently update
401 for i in 0..50 {
402 shared.update(TestConfig {
403 name: if i % 2 == 0 {
404 "even".into()
405 } else {
406 "odd".into()
407 },
408 value: i,
409 });
410 tokio::task::yield_now().await;
411 }
412
413 // Reader should never panic, all values should be valid
414 let values = reader.await.expect("reader task should not panic");
415 for v in &values {
416 assert!(
417 v == "initial" || v == "even" || v == "odd",
418 "unexpected value: {v}"
419 );
420 }
421 }
422}