1use arc_swap::ArcSwap;
2use crossbeam_channel::{Receiver, Sender, select, unbounded};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex, Weak};
5use std::thread::{self, JoinHandle};
6use std::time::SystemTime;
7
8use crate::{ChangeEvent, ChangeSource, SettingsError, StorageBackend, StoredValue};
9
10pub type ExternalApplier<T> =
11 Box<dyn Fn(&mut T, &str, &StoredValue) -> ApplyResult + Send + Sync + 'static>;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ApplyResult {
15 Applied,
16 AppliedWithValue { value: StoredValue },
17 Ignored,
18 DeserializeFailure { raw: String, error: String },
19}
20
21pub struct SettingsHandle<T> {
22 inner: Arc<SettingsInner<T>>,
23}
24
25struct SettingsInner<T> {
26 current: ArcSwap<T>,
27 write_lock: Mutex<()>,
28 backend: Mutex<Box<dyn StorageBackend>>,
29 external_applier: ExternalApplier<T>,
30 subscribers: Mutex<Vec<Sender<ChangeEvent>>>,
31 last_seen: Mutex<HashMap<String, StoredValue>>,
34 diff_shutdown_tx: Sender<()>,
35 diff_thread: Option<JoinHandle<()>>,
36}
37
38impl<T> SettingsHandle<T>
39where
40 T: Clone + Send + Sync + 'static,
41{
42 pub fn new(initial: T, backend: Box<dyn StorageBackend>) -> Self {
44 let initial_last_seen = backend.load_all().unwrap_or_default();
45 Self::new_with_stored(initial, backend, initial_last_seen)
46 }
47
48 pub fn new_with_stored(
55 initial: T,
56 backend: Box<dyn StorageBackend>,
57 stored: HashMap<String, StoredValue>,
58 ) -> Self {
59 Self::new_with_stored_and_applier(initial, backend, stored, noop_external_applier())
60 }
61
62 pub fn new_with_stored_and_applier(
71 initial: T,
72 backend: Box<dyn StorageBackend>,
73 stored: HashMap<String, StoredValue>,
74 external_applier: ExternalApplier<T>,
75 ) -> Self {
76 let (diff_shutdown_tx, diff_shutdown_rx) = unbounded::<()>();
77 let commits_rx = backend.watch_changes();
78
79 let inner = Arc::new_cyclic(move |weak: &Weak<SettingsInner<T>>| {
80 let diff_thread = if let Some(commits_rx) = commits_rx {
81 let weak = weak.clone();
82 Some(thread::spawn(move || {
83 diff_loop(weak, commits_rx, diff_shutdown_rx);
84 }))
85 } else {
86 None
87 };
88
89 SettingsInner {
90 current: ArcSwap::from_pointee(initial),
91 write_lock: Mutex::new(()),
92 backend: Mutex::new(backend),
93 external_applier,
94 subscribers: Mutex::new(Vec::new()),
95 last_seen: Mutex::new(stored),
96 diff_shutdown_tx,
97 diff_thread,
98 }
99 });
100
101 Self { inner }
102 }
103
104 pub fn snapshot(&self) -> Arc<T> {
105 self.inner.current.load_full()
106 }
107
108 pub fn on_change(&self) -> Receiver<ChangeEvent> {
109 let (tx, rx) = unbounded();
110 self.inner.subscribers.lock().unwrap().push(tx);
111 rx
112 }
113
114 fn broadcast(&self, event: ChangeEvent) {
115 inner_broadcast(&self.inner, event);
116 }
117
118 pub fn write_field(
119 &self,
120 key: &str,
121 old_value: Option<StoredValue>,
122 new_value: StoredValue,
123 mutator: impl FnOnce(&mut T),
124 ) -> Result<(), SettingsError> {
125 let _writer = self.inner.write_lock.lock().unwrap();
127
128 let backend = self.inner.backend.lock().unwrap();
130
131 backend.set(key, &new_value)?;
133
134 self.inner
136 .last_seen
137 .lock()
138 .unwrap()
139 .insert(key.to_string(), new_value.clone());
140
141 let prev = self.inner.current.load_full();
143 let mut next = (*prev).clone();
144 mutator(&mut next);
145 self.inner.current.store(Arc::new(next));
146
147 drop(backend);
149 drop(_writer);
150
151 self.broadcast(ChangeEvent::Set {
153 key: key.into(),
154 old_value,
155 new_value,
156 source: ChangeSource::Local,
157 timestamp: SystemTime::now(),
158 });
159
160 Ok(())
161 }
162}
163
164fn noop_external_applier<T>() -> ExternalApplier<T> {
165 Box::new(|_, _, _| ApplyResult::Ignored)
166}
167
168impl<T> Clone for SettingsHandle<T> {
169 fn clone(&self) -> Self {
170 Self {
171 inner: Arc::clone(&self.inner),
172 }
173 }
174}
175
176impl<T> Drop for SettingsInner<T> {
177 fn drop(&mut self) {
178 let _ = self.diff_shutdown_tx.send(());
180 if let Some(handle) = self.diff_thread.take() {
183 let _ = handle.join();
184 }
185 }
186}
187
188fn inner_broadcast<T>(inner: &SettingsInner<T>, event: ChangeEvent) {
189 let mut subs = inner.subscribers.lock().unwrap();
190 subs.retain(|tx| tx.send(event.clone()).is_ok());
191}
192
193fn diff_loop<T>(weak: Weak<SettingsInner<T>>, commits_rx: Receiver<()>, shutdown_rx: Receiver<()>)
194where
195 T: Clone + Send + Sync + 'static,
196{
197 loop {
198 select! {
199 recv(shutdown_rx) -> _ => return,
200 recv(commits_rx) -> msg => {
201 if msg.is_err() {
202 return;
203 }
204 let Some(inner) = weak.upgrade() else { return };
205
206 let _writer = inner.write_lock.lock().unwrap();
210 let fresh = {
211 let backend = inner.backend.lock().unwrap();
212 match backend.load_all() {
213 Ok(map) => map,
214 Err(_) => continue,
215 }
216 };
217
218 let mut last_seen = inner.last_seen.lock().unwrap();
219 let current = inner.current.load_full();
220 let mut next = (*current).clone();
221 let mut should_store_next = false;
222 let mut events = Vec::new();
223
224 for (key, new_value) in &fresh {
226 let old_value = last_seen.get(key).cloned();
227 if old_value.as_ref() != Some(new_value) {
228 match (inner.external_applier)(&mut next, key, new_value) {
229 ApplyResult::Applied => {
230 should_store_next = true;
231 events.push(ChangeEvent::Set {
232 key: key.clone(),
233 old_value,
234 new_value: new_value.clone(),
235 source: ChangeSource::External,
236 timestamp: SystemTime::now(),
237 });
238 }
239 ApplyResult::AppliedWithValue { value } => {
240 should_store_next = true;
241 events.push(ChangeEvent::Set {
242 key: key.clone(),
243 old_value,
244 new_value: value,
245 source: ChangeSource::External,
246 timestamp: SystemTime::now(),
247 });
248 }
249 ApplyResult::Ignored => {
250 events.push(ChangeEvent::Set {
251 key: key.clone(),
252 old_value,
253 new_value: new_value.clone(),
254 source: ChangeSource::External,
255 timestamp: SystemTime::now(),
256 });
257 }
258 ApplyResult::DeserializeFailure { raw, error } => {
259 events.push(ChangeEvent::DeserializeFailure {
260 key: key.clone(),
261 raw,
262 error,
263 source: ChangeSource::External,
264 timestamp: SystemTime::now(),
265 });
266 }
267 }
268 }
269 }
270 for (key, old_value) in last_seen.iter() {
272 if !fresh.contains_key(key) {
273 events.push(ChangeEvent::Deleted {
274 key: key.clone(),
275 old_value: old_value.clone(),
276 source: ChangeSource::External,
277 timestamp: SystemTime::now(),
278 });
279 }
280 }
281
282 if should_store_next {
283 inner.current.store(Arc::new(next));
284 }
285 *last_seen = fresh;
286 drop(last_seen);
287 drop(_writer);
288
289 for event in events {
290 inner_broadcast(&inner, event);
291 }
292 }
293 }
294 }
295}
296
297#[cfg(test)]
298mod test {
299 use super::*;
300 use std::time::{Duration, SystemTime};
301
302 use crate::{BackendError, ChangeSource, StoredValue};
303
304 struct MockBackend {
305 data: Arc<Mutex<HashMap<String, StoredValue>>>,
306 commits_tx: Sender<()>,
307 commits_rx: Receiver<()>,
308 }
309
310 struct CountingBackend {
311 load_count: Arc<Mutex<usize>>,
312 }
313
314 impl MockBackend {
315 fn new() -> Self {
316 let (commits_tx, commits_rx) = unbounded();
317 Self {
318 data: Arc::new(Mutex::new(HashMap::new())),
319 commits_tx,
320 commits_rx,
321 }
322 }
323
324 fn data(&self) -> Arc<Mutex<HashMap<String, StoredValue>>> {
325 Arc::clone(&self.data)
326 }
327
328 fn commit_signal(&self) -> Sender<()> {
329 self.commits_tx.clone()
330 }
331 }
332
333 impl StorageBackend for MockBackend {
334 fn load_all(&self) -> Result<HashMap<String, StoredValue>, BackendError> {
335 Ok(self.data.lock().unwrap().clone())
336 }
337
338 fn set(&self, key: &str, value: &StoredValue) -> Result<(), BackendError> {
339 self.data
340 .lock()
341 .unwrap()
342 .insert(key.to_string(), value.clone());
343 Ok(())
344 }
345
346 fn delete(&self, key: &str) -> Result<(), BackendError> {
347 self.data.lock().unwrap().remove(key);
348 Ok(())
349 }
350
351 fn watch_changes(&self) -> Option<Receiver<()>> {
352 Some(self.commits_rx.clone())
353 }
354 }
355
356 impl CountingBackend {
357 fn new(load_count: Arc<Mutex<usize>>) -> Self {
358 Self { load_count }
359 }
360 }
361
362 impl StorageBackend for CountingBackend {
363 fn load_all(&self) -> Result<HashMap<String, StoredValue>, BackendError> {
364 *self.load_count.lock().unwrap() += 1;
365 Ok(HashMap::new())
366 }
367
368 fn set(&self, _key: &str, _value: &StoredValue) -> Result<(), BackendError> {
369 Ok(())
370 }
371
372 fn delete(&self, _key: &str) -> Result<(), BackendError> {
373 Ok(())
374 }
375
376 fn watch_changes(&self) -> Option<Receiver<()>> {
377 None
378 }
379 }
380
381 fn sample_event() -> ChangeEvent {
382 ChangeEvent::Set {
383 key: "theme".into(),
384 old_value: None,
385 new_value: StoredValue::encode(&"dark").unwrap(),
386 source: ChangeSource::Local,
387 timestamp: SystemTime::now(),
388 }
389 }
390
391 #[test]
392 fn test_snapshot_returns_initial_value() {
393 let backend = Box::new(MockBackend::new());
394 let handle = SettingsHandle::new(42, backend);
395 let snap = handle.snapshot();
396 assert_eq!(*snap, 42)
397 }
398
399 #[test]
400 fn test_clone_shares_state() {
401 let backend = Box::new(MockBackend::new());
402 let handle = SettingsHandle::new(42, backend);
403 let clone = handle.clone();
404 let s1 = handle.snapshot();
405 let s2 = clone.snapshot();
406 assert!(Arc::ptr_eq(&s1, &s2));
408 assert_eq!(*s1, *s2)
410 }
411
412 #[test]
413 fn test_on_change_receives_broadcast_event() {
414 let backend = Box::new(MockBackend::new());
415 let handle = SettingsHandle::new(42, backend);
416
417 let rx = handle.on_change();
418
419 let event = sample_event();
420 handle.broadcast(event.clone());
421
422 let received = rx.recv().unwrap();
423 assert_eq!(received, event)
424 }
425
426 #[test]
427 fn test_multiple_subscribers_all_receive() {
428 let backend = Box::new(MockBackend::new());
429 let handle = SettingsHandle::new(42, backend);
430
431 let rx1 = handle.on_change();
432 let rx2 = handle.on_change();
433
434 let event = sample_event();
435 handle.broadcast(event.clone());
436
437 assert!(rx1.try_recv().is_ok());
438 assert!(rx2.try_recv().is_ok());
439 }
440
441 #[test]
442 fn new_with_stored_uses_provided_last_seen_without_loading_backend() {
443 let load_count = Arc::new(Mutex::new(0));
444 let backend = Box::new(CountingBackend::new(Arc::clone(&load_count)));
445 let mut stored = HashMap::new();
446 stored.insert("theme".to_string(), StoredValue::encode(&"dark").unwrap());
447
448 let _handle = SettingsHandle::new_with_stored(42, backend, stored);
449
450 assert_eq!(*load_count.lock().unwrap(), 0);
451 }
452
453 #[test]
454 fn test_subscriber_is_cleaned_up_on_next_broadcast() {
455 let backend = Box::new(MockBackend::new());
456 let handle = SettingsHandle::new(42, backend);
457
458 {
459 let _rx1 = handle.on_change();
460 }
461 let rx2 = handle.on_change();
462
463 let event = sample_event();
464 handle.broadcast(event.clone());
465
466 assert!(rx2.try_recv().is_ok());
467 assert_eq!(handle.inner.subscribers.lock().unwrap().len(), 1)
468 }
469
470 #[test]
471 fn test_write_field_persists_and_broadcasts() {
472 let mock = MockBackend::new();
473 let backend_data = mock.data();
474 let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
475 let rx = handle.on_change();
476
477 let new_value = StoredValue::encode(&42u32).unwrap();
478 let old_value = Some(StoredValue::encode(&0u32).unwrap());
479
480 handle
481 .write_field("the_value", old_value.clone(), new_value.clone(), |state| {
482 *state = 42u32
483 })
484 .unwrap();
485
486 assert_eq!(*handle.snapshot(), 42);
488
489 let stored = backend_data.lock().unwrap();
491 assert_eq!(stored.get("the_value"), Some(&new_value));
492 drop(stored);
493
494 let event = rx.try_recv().unwrap();
496 match event {
497 ChangeEvent::Set {
498 key,
499 old_value: old,
500 new_value: new,
501 source,
502 ..
503 } => {
504 assert_eq!(key, "the_value");
505 assert_eq!(old, old_value);
506 assert_eq!(new, new_value);
507 assert_eq!(source, ChangeSource::Local);
508 }
509 other => panic!("expected ChangeEvent::Set, got {:?}", other),
510 }
511 }
512
513 #[test]
514 fn test_external_change_emits_external_event() {
515 let mock = MockBackend::new();
516 let data = mock.data();
517 let commit_signal = mock.commit_signal();
518 let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
519 let rx = handle.on_change();
520
521 let new_value = StoredValue::encode(&42u32).unwrap();
524 data.lock()
525 .unwrap()
526 .insert("the_value".to_string(), new_value.clone());
527 commit_signal.send(()).unwrap();
528
529 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
530 match event {
531 ChangeEvent::Set {
532 key,
533 old_value,
534 new_value: new,
535 source,
536 ..
537 } => {
538 assert_eq!(key, "the_value");
539 assert_eq!(old_value, None);
540 assert_eq!(new, new_value);
541 assert_eq!(source, ChangeSource::External);
542 }
543 other => panic!("expected ChangeEvent::Set, got {:?}", other),
544 }
545 }
546
547 #[test]
548 fn test_external_change_updates_snapshot_when_applier_succeeds() {
549 let mock = MockBackend::new();
550 let data = mock.data();
551 let commit_signal = mock.commit_signal();
552 let handle: SettingsHandle<u32> = SettingsHandle::new_with_stored_and_applier(
553 0,
554 Box::new(mock),
555 HashMap::new(),
556 Box::new(|state, key, value| {
557 if key != "the_value" {
558 return ApplyResult::Ignored;
559 }
560
561 match value.decode::<u32>() {
562 Ok(decoded) => {
563 *state = decoded;
564 ApplyResult::Applied
565 }
566 Err(error) => ApplyResult::DeserializeFailure {
567 raw: value.as_str().to_string(),
568 error: error.to_string(),
569 },
570 }
571 }),
572 );
573 let rx = handle.on_change();
574
575 let new_value = StoredValue::encode(&42u32).unwrap();
576 data.lock()
577 .unwrap()
578 .insert("the_value".to_string(), new_value);
579 commit_signal.send(()).unwrap();
580
581 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
582 assert!(matches!(
583 event,
584 ChangeEvent::Set {
585 source: ChangeSource::External,
586 ..
587 }
588 ));
589 assert_eq!(*handle.snapshot(), 42);
590 }
591
592 #[test]
593 fn test_external_change_emits_deserialize_failure_and_preserves_snapshot() {
594 let mock = MockBackend::new();
595 let data = mock.data();
596 let commit_signal = mock.commit_signal();
597 let handle: SettingsHandle<u32> = SettingsHandle::new_with_stored_and_applier(
598 7,
599 Box::new(mock),
600 HashMap::new(),
601 Box::new(|state, key, value| {
602 if key != "the_value" {
603 return ApplyResult::Ignored;
604 }
605
606 match value.decode::<u32>() {
607 Ok(decoded) => {
608 *state = decoded;
609 ApplyResult::Applied
610 }
611 Err(error) => ApplyResult::DeserializeFailure {
612 raw: value.as_str().to_string(),
613 error: error.to_string(),
614 },
615 }
616 }),
617 );
618 let rx = handle.on_change();
619
620 data.lock().unwrap().insert(
621 "the_value".to_string(),
622 StoredValue::from_raw("\"not-a-number\"".to_string()),
623 );
624 commit_signal.send(()).unwrap();
625
626 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
627 match event {
628 ChangeEvent::DeserializeFailure {
629 key, raw, source, ..
630 } => {
631 assert_eq!(key, "the_value");
632 assert_eq!(raw, "\"not-a-number\"");
633 assert_eq!(source, ChangeSource::External);
634 }
635 event => panic!("unexpected event: {event:?}"),
636 }
637 assert_eq!(*handle.snapshot(), 7);
638 }
639
640 #[test]
641 fn test_local_write_does_not_re_emit_as_external() {
642 let mock = MockBackend::new();
643 let commit_signal = mock.commit_signal();
644 let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
645 let rx = handle.on_change();
646
647 let new_value = StoredValue::encode(&42u32).unwrap();
648 handle
649 .write_field("the_value", None, new_value.clone(), |state| *state = 42u32)
650 .unwrap();
651
652 let first = rx.try_recv().unwrap();
654 match first {
655 ChangeEvent::Set { source, .. } => assert_eq!(source, ChangeSource::Local),
656 other => panic!("expected Local Set, got {:?}", other),
657 }
658
659 commit_signal.send(()).unwrap();
662
663 let result = rx.recv_timeout(Duration::from_millis(500));
665 assert!(
666 result.is_err(),
667 "expected timeout (no External event), got {:?}",
668 result
669 );
670 }
671}