hyperi_rustlib/config/
shared.rs1use std::sync::Arc;
58use std::sync::atomic::{AtomicU64, Ordering};
59
60use parking_lot::RwLock;
61use tokio::sync::watch;
62use tracing::debug;
63
64pub struct SharedConfig<T> {
74 inner: Arc<RwLock<T>>,
75 version: Arc<AtomicU64>,
76 watch_tx: Arc<watch::Sender<u64>>,
77 watch_rx: watch::Receiver<u64>,
78}
79
80impl<T: Clone + Send + Sync + 'static> SharedConfig<T> {
81 #[must_use]
85 pub fn new(config: T) -> Self {
86 let (watch_tx, watch_rx) = watch::channel(0);
87
88 Self {
89 inner: Arc::new(RwLock::new(config)),
90 version: Arc::new(AtomicU64::new(0)),
91 watch_tx: Arc::new(watch_tx),
92 watch_rx,
93 }
94 }
95
96 #[inline]
102 pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, T> {
103 self.inner.read()
104 }
105
106 #[must_use]
112 pub fn get(&self) -> T {
113 self.inner.read().clone()
114 }
115
116 pub fn with<F, R>(&self, f: F) -> R
122 where
123 F: FnOnce(&T) -> R,
124 {
125 let guard = self.inner.read();
126 f(&guard)
127 }
128
129 #[inline]
134 #[must_use]
135 pub fn version(&self) -> u64 {
136 self.version.load(Ordering::Acquire)
137 }
138
139 pub fn update(&self, new_config: T) {
146 {
147 let mut guard = self.inner.write();
148 *guard = new_config;
149 }
150
151 let new_version = self.version.fetch_add(1, Ordering::AcqRel) + 1;
152
153 let _ = self.watch_tx.send(new_version);
155
156 debug!(version = new_version, "Configuration updated");
157 }
158
159 #[must_use]
175 pub fn subscribe(&self) -> watch::Receiver<u64> {
176 self.watch_rx.clone()
177 }
178}
179
180impl<T: Clone + Send + Sync + 'static> Clone for SharedConfig<T> {
181 fn clone(&self) -> Self {
182 Self {
183 inner: self.inner.clone(),
184 version: self.version.clone(),
185 watch_tx: self.watch_tx.clone(),
186 watch_rx: self.watch_rx.clone(),
187 }
188 }
189}
190
191impl<T: Clone + Send + Sync + Default + 'static> Default for SharedConfig<T> {
192 fn default() -> Self {
193 Self::new(T::default())
194 }
195}
196
197impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for SharedConfig<T> {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("SharedConfig")
200 .field("version", &self.version())
201 .finish()
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[derive(Clone, Debug, Default, PartialEq)]
210 struct TestConfig {
211 pub name: String,
212 pub value: u64,
213 }
214
215 #[test]
216 fn test_new_starts_at_version_zero() {
217 let shared = SharedConfig::new(TestConfig::default());
218 assert_eq!(shared.version(), 0);
219 }
220
221 #[test]
222 fn test_read_returns_initial_value() {
223 let cfg = TestConfig {
224 name: "test".into(),
225 value: 42,
226 };
227 let shared = SharedConfig::new(cfg.clone());
228
229 let guard = shared.read();
230 assert_eq!(guard.name, "test");
231 assert_eq!(guard.value, 42);
232 }
233
234 #[test]
235 fn test_get_clones_current_config() {
236 let cfg = TestConfig {
237 name: "original".into(),
238 value: 1,
239 };
240 let shared = SharedConfig::new(cfg);
241
242 let got = shared.get();
243 assert_eq!(got.name, "original");
244 }
245
246 #[test]
247 fn test_with_closure_access() {
248 let cfg = TestConfig {
249 name: "closure".into(),
250 value: 99,
251 };
252 let shared = SharedConfig::new(cfg);
253
254 let val = shared.with(|c| c.value);
255 assert_eq!(val, 99);
256 }
257
258 #[test]
259 fn test_update_increments_version() {
260 let shared = SharedConfig::new(TestConfig::default());
261
262 assert_eq!(shared.version(), 0);
263
264 shared.update(TestConfig {
265 name: "v1".into(),
266 value: 1,
267 });
268 assert_eq!(shared.version(), 1);
269
270 shared.update(TestConfig {
271 name: "v2".into(),
272 value: 2,
273 });
274 assert_eq!(shared.version(), 2);
275 }
276
277 #[test]
278 fn test_update_changes_visible() {
279 let shared = SharedConfig::new(TestConfig::default());
280
281 shared.update(TestConfig {
282 name: "updated".into(),
283 value: 100,
284 });
285
286 assert_eq!(shared.read().name, "updated");
287 assert_eq!(shared.read().value, 100);
288 }
289
290 #[tokio::test]
291 async fn test_subscribe_receives_notification() {
292 let shared = SharedConfig::new(TestConfig::default());
293 let mut rx = shared.subscribe();
294
295 assert_eq!(*rx.borrow(), 0);
296
297 shared.update(TestConfig {
298 name: "notify".into(),
299 value: 1,
300 });
301
302 rx.changed().await.expect("should receive change");
303 assert_eq!(*rx.borrow(), 1);
304 }
305
306 #[tokio::test]
307 async fn test_multiple_subscribers_all_notified() {
308 let shared = SharedConfig::new(TestConfig::default());
309
310 let mut rx1 = shared.subscribe();
311 let mut rx2 = shared.subscribe();
312 let mut rx3 = shared.subscribe();
313
314 shared.update(TestConfig {
315 name: "multi".into(),
316 value: 1,
317 });
318
319 rx1.changed().await.expect("subscriber 1");
320 rx2.changed().await.expect("subscriber 2");
321 rx3.changed().await.expect("subscriber 3");
322
323 assert_eq!(*rx1.borrow(), 1);
324 assert_eq!(*rx2.borrow(), 1);
325 assert_eq!(*rx3.borrow(), 1);
326 }
327
328 #[test]
329 fn test_clone_shares_state() {
330 let shared = SharedConfig::new(TestConfig::default());
331 let cloned = shared.clone();
332
333 shared.update(TestConfig {
334 name: "from-original".into(),
335 value: 1,
336 });
337
338 assert_eq!(cloned.read().name, "from-original");
340 assert_eq!(cloned.version(), 1);
341
342 cloned.update(TestConfig {
344 name: "from-clone".into(),
345 value: 2,
346 });
347
348 assert_eq!(shared.read().name, "from-clone");
349 assert_eq!(shared.version(), 2);
350 }
351
352 #[test]
353 fn test_default() {
354 let shared: SharedConfig<TestConfig> = SharedConfig::default();
355 assert_eq!(shared.version(), 0);
356 assert_eq!(shared.read().name, "");
357 }
358
359 #[tokio::test]
360 async fn test_concurrent_read_during_update() {
361 let shared = SharedConfig::new(TestConfig {
362 name: "initial".into(),
363 value: 0,
364 });
365
366 let shared_clone = shared.clone();
367
368 let reader = tokio::spawn(async move {
370 let mut values = Vec::new();
371 for _ in 0..100 {
372 let name = shared_clone.with(|c| c.name.clone());
373 values.push(name);
374 tokio::task::yield_now().await;
375 }
376 values
377 });
378
379 for i in 0..50 {
381 shared.update(TestConfig {
382 name: if i % 2 == 0 {
383 "even".into()
384 } else {
385 "odd".into()
386 },
387 value: i,
388 });
389 tokio::task::yield_now().await;
390 }
391
392 let values = reader.await.expect("reader task should not panic");
394 for v in &values {
395 assert!(
396 v == "initial" || v == "even" || v == "odd",
397 "unexpected value: {v}"
398 );
399 }
400 }
401}