Skip to main content

attune_core/
handle.rs

1use arc_swap::ArcSwap;
2use crossbeam_channel::{Receiver, Sender, select, unbounded};
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex, Weak};
5use std::thread::{self, JoinHandle};
6use std::time::SystemTime;
7
8use crate::{ChangeEvent, ChangeSource, SettingsError, StorageBackend, StoredValue};
9
10pub type ExternalApplier<T> =
11    Box<dyn Fn(&mut T, &str, &StoredValue) -> ApplyResult + Send + Sync + 'static>;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ApplyResult {
15    Applied,
16    AppliedWithValue { value: StoredValue },
17    Ignored,
18    DeserializeFailure { raw: String, error: String },
19}
20
21pub struct SettingsHandle<T> {
22    inner: Arc<SettingsInner<T>>,
23}
24
25struct SettingsInner<T> {
26    current: ArcSwap<T>,
27    write_lock: Mutex<()>,
28    backend: Mutex<Box<dyn StorageBackend>>,
29    external_applier: ExternalApplier<T>,
30    subscribers: Mutex<Vec<Sender<ChangeEvent>>>,
31    // Last-known DB state. Updated by `write_field` on local writes (so the diff
32    // loop doesn't re-emit them as External) and by the diff loop after each tick.
33    last_seen: Mutex<HashMap<String, StoredValue>>,
34    diff_shutdown_tx: Sender<()>,
35    diff_thread: Option<JoinHandle<()>>,
36}
37
38impl<T> SettingsHandle<T>
39where
40    T: Clone + Send + Sync + 'static,
41{
42    /// Creates a settings handle and initializes diff state from the backend.
43    pub fn new(initial: T, backend: Box<dyn StorageBackend>) -> Self {
44        let initial_last_seen = backend.load_all().unwrap_or_default();
45        Self::new_with_stored(initial, backend, initial_last_seen)
46    }
47
48    /// Creates a settings handle with caller-provided persisted values.
49    ///
50    /// This constructor is used when startup code has already loaded persisted
51    /// values to resolve the initial settings snapshot. Passing the same map
52    /// into the handle keeps the cross-process diff state aligned with that
53    /// snapshot without reading the backend a second time.
54    pub fn new_with_stored(
55        initial: T,
56        backend: Box<dyn StorageBackend>,
57        stored: HashMap<String, StoredValue>,
58    ) -> Self {
59        Self::new_with_stored_and_applier(initial, backend, stored, noop_external_applier())
60    }
61
62    /// Creates a settings handle with caller-provided persisted values and an external applier.
63    ///
64    /// This constructor is used by generated settings code that has already
65    /// loaded stored values and can also map persisted storage keys back onto
66    /// fields in `T`. The applier is called when the cross-process diff loop
67    /// observes an external stored-value change; successful applications update
68    /// the in-memory snapshot before the corresponding change event is
69    /// broadcast.
70    pub fn new_with_stored_and_applier(
71        initial: T,
72        backend: Box<dyn StorageBackend>,
73        stored: HashMap<String, StoredValue>,
74        external_applier: ExternalApplier<T>,
75    ) -> Self {
76        let (diff_shutdown_tx, diff_shutdown_rx) = unbounded::<()>();
77        let commits_rx = backend.watch_changes();
78
79        let inner = Arc::new_cyclic(move |weak: &Weak<SettingsInner<T>>| {
80            let diff_thread = if let Some(commits_rx) = commits_rx {
81                let weak = weak.clone();
82                Some(thread::spawn(move || {
83                    diff_loop(weak, commits_rx, diff_shutdown_rx);
84                }))
85            } else {
86                None
87            };
88
89            SettingsInner {
90                current: ArcSwap::from_pointee(initial),
91                write_lock: Mutex::new(()),
92                backend: Mutex::new(backend),
93                external_applier,
94                subscribers: Mutex::new(Vec::new()),
95                last_seen: Mutex::new(stored),
96                diff_shutdown_tx,
97                diff_thread,
98            }
99        });
100
101        Self { inner }
102    }
103
104    pub fn snapshot(&self) -> Arc<T> {
105        self.inner.current.load_full()
106    }
107
108    pub fn on_change(&self) -> Receiver<ChangeEvent> {
109        let (tx, rx) = unbounded();
110        self.inner.subscribers.lock().unwrap().push(tx);
111        rx
112    }
113
114    fn broadcast(&self, event: ChangeEvent) {
115        inner_broadcast(&self.inner, event);
116    }
117
118    pub fn write_field(
119        &self,
120        key: &str,
121        old_value: Option<StoredValue>,
122        new_value: StoredValue,
123        mutator: impl FnOnce(&mut T),
124    ) -> Result<(), SettingsError> {
125        // 1. Serialize writers.
126        let _writer = self.inner.write_lock.lock().unwrap();
127
128        // 2. Lock the backend.
129        let backend = self.inner.backend.lock().unwrap();
130
131        // 3. Persist before memory update.
132        backend.set(key, &new_value)?;
133
134        // 4. Update last_seen so the diff loop won't re-emit this change as External.
135        self.inner
136            .last_seen
137            .lock()
138            .unwrap()
139            .insert(key.to_string(), new_value.clone());
140
141        // 5. Clone, mutate, store via ArcSwap.
142        let prev = self.inner.current.load_full();
143        let mut next = (*prev).clone();
144        mutator(&mut next);
145        self.inner.current.store(Arc::new(next));
146
147        // 6. Release locks before broadcasting.
148        drop(backend);
149        drop(_writer);
150
151        // 7. Broadcast a Local-source Set event.
152        self.broadcast(ChangeEvent::Set {
153            key: key.into(),
154            old_value,
155            new_value,
156            source: ChangeSource::Local,
157            timestamp: SystemTime::now(),
158        });
159
160        Ok(())
161    }
162}
163
164fn noop_external_applier<T>() -> ExternalApplier<T> {
165    Box::new(|_, _, _| ApplyResult::Ignored)
166}
167
168impl<T> Clone for SettingsHandle<T> {
169    fn clone(&self) -> Self {
170        Self {
171            inner: Arc::clone(&self.inner),
172        }
173    }
174}
175
176impl<T> Drop for SettingsInner<T> {
177    fn drop(&mut self) {
178        // Wake the diff thread so it exits immediately rather than waiting on a tick.
179        let _ = self.diff_shutdown_tx.send(());
180        // Join the thread. Errors are swallowed — a panicked thread shouldn't
181        // propagate during Drop.
182        if let Some(handle) = self.diff_thread.take() {
183            let _ = handle.join();
184        }
185    }
186}
187
188fn inner_broadcast<T>(inner: &SettingsInner<T>, event: ChangeEvent) {
189    let mut subs = inner.subscribers.lock().unwrap();
190    subs.retain(|tx| tx.send(event.clone()).is_ok());
191}
192
193fn diff_loop<T>(weak: Weak<SettingsInner<T>>, commits_rx: Receiver<()>, shutdown_rx: Receiver<()>)
194where
195    T: Clone + Send + Sync + 'static,
196{
197    loop {
198        select! {
199            recv(shutdown_rx) -> _ => return,
200            recv(commits_rx) -> msg => {
201                if msg.is_err() {
202                    return;
203                }
204                let Some(inner) = weak.upgrade() else { return };
205
206                // Serialize behind in-flight local writers. By the time we get the
207                // write_lock, any local writer has already updated last_seen, so
208                // its change won't appear as a delta below.
209                let _writer = inner.write_lock.lock().unwrap();
210                let fresh = {
211                    let backend = inner.backend.lock().unwrap();
212                    match backend.load_all() {
213                        Ok(map) => map,
214                        Err(_) => continue,
215                    }
216                };
217
218                let mut last_seen = inner.last_seen.lock().unwrap();
219                let current = inner.current.load_full();
220                let mut next = (*current).clone();
221                let mut should_store_next = false;
222                let mut events = Vec::new();
223
224                // Emit events for changed/new keys.
225                for (key, new_value) in &fresh {
226                    let old_value = last_seen.get(key).cloned();
227                    if old_value.as_ref() != Some(new_value) {
228                        match (inner.external_applier)(&mut next, key, new_value) {
229                            ApplyResult::Applied => {
230                                should_store_next = true;
231                                events.push(ChangeEvent::Set {
232                                    key: key.clone(),
233                                    old_value,
234                                    new_value: new_value.clone(),
235                                    source: ChangeSource::External,
236                                    timestamp: SystemTime::now(),
237                                });
238                            }
239                            ApplyResult::AppliedWithValue { value } => {
240                                should_store_next = true;
241                                events.push(ChangeEvent::Set {
242                                    key: key.clone(),
243                                    old_value,
244                                    new_value: value,
245                                    source: ChangeSource::External,
246                                    timestamp: SystemTime::now(),
247                                });
248                            }
249                            ApplyResult::Ignored => {
250                                events.push(ChangeEvent::Set {
251                                    key: key.clone(),
252                                    old_value,
253                                    new_value: new_value.clone(),
254                                    source: ChangeSource::External,
255                                    timestamp: SystemTime::now(),
256                                });
257                            }
258                            ApplyResult::DeserializeFailure { raw, error } => {
259                                events.push(ChangeEvent::DeserializeFailure {
260                                    key: key.clone(),
261                                    raw,
262                                    error,
263                                    source: ChangeSource::External,
264                                    timestamp: SystemTime::now(),
265                                });
266                            }
267                        }
268                    }
269                }
270                // Emit events for deleted keys.
271                for (key, old_value) in last_seen.iter() {
272                    if !fresh.contains_key(key) {
273                        events.push(ChangeEvent::Deleted {
274                                key: key.clone(),
275                                old_value: old_value.clone(),
276                                source: ChangeSource::External,
277                                timestamp: SystemTime::now(),
278                        });
279                    }
280                }
281
282                if should_store_next {
283                    inner.current.store(Arc::new(next));
284                }
285                *last_seen = fresh;
286                drop(last_seen);
287                drop(_writer);
288
289                for event in events {
290                    inner_broadcast(&inner, event);
291                }
292            }
293        }
294    }
295}
296
297#[cfg(test)]
298mod test {
299    use super::*;
300    use std::time::{Duration, SystemTime};
301
302    use crate::{BackendError, ChangeSource, StoredValue};
303
304    struct MockBackend {
305        data: Arc<Mutex<HashMap<String, StoredValue>>>,
306        commits_tx: Sender<()>,
307        commits_rx: Receiver<()>,
308    }
309
310    struct CountingBackend {
311        load_count: Arc<Mutex<usize>>,
312    }
313
314    impl MockBackend {
315        fn new() -> Self {
316            let (commits_tx, commits_rx) = unbounded();
317            Self {
318                data: Arc::new(Mutex::new(HashMap::new())),
319                commits_tx,
320                commits_rx,
321            }
322        }
323
324        fn data(&self) -> Arc<Mutex<HashMap<String, StoredValue>>> {
325            Arc::clone(&self.data)
326        }
327
328        fn commit_signal(&self) -> Sender<()> {
329            self.commits_tx.clone()
330        }
331    }
332
333    impl StorageBackend for MockBackend {
334        fn load_all(&self) -> Result<HashMap<String, StoredValue>, BackendError> {
335            Ok(self.data.lock().unwrap().clone())
336        }
337
338        fn set(&self, key: &str, value: &StoredValue) -> Result<(), BackendError> {
339            self.data
340                .lock()
341                .unwrap()
342                .insert(key.to_string(), value.clone());
343            Ok(())
344        }
345
346        fn delete(&self, key: &str) -> Result<(), BackendError> {
347            self.data.lock().unwrap().remove(key);
348            Ok(())
349        }
350
351        fn watch_changes(&self) -> Option<Receiver<()>> {
352            Some(self.commits_rx.clone())
353        }
354    }
355
356    impl CountingBackend {
357        fn new(load_count: Arc<Mutex<usize>>) -> Self {
358            Self { load_count }
359        }
360    }
361
362    impl StorageBackend for CountingBackend {
363        fn load_all(&self) -> Result<HashMap<String, StoredValue>, BackendError> {
364            *self.load_count.lock().unwrap() += 1;
365            Ok(HashMap::new())
366        }
367
368        fn set(&self, _key: &str, _value: &StoredValue) -> Result<(), BackendError> {
369            Ok(())
370        }
371
372        fn delete(&self, _key: &str) -> Result<(), BackendError> {
373            Ok(())
374        }
375
376        fn watch_changes(&self) -> Option<Receiver<()>> {
377            None
378        }
379    }
380
381    fn sample_event() -> ChangeEvent {
382        ChangeEvent::Set {
383            key: "theme".into(),
384            old_value: None,
385            new_value: StoredValue::encode(&"dark").unwrap(),
386            source: ChangeSource::Local,
387            timestamp: SystemTime::now(),
388        }
389    }
390
391    #[test]
392    fn test_snapshot_returns_initial_value() {
393        let backend = Box::new(MockBackend::new());
394        let handle = SettingsHandle::new(42, backend);
395        let snap = handle.snapshot();
396        assert_eq!(*snap, 42)
397    }
398
399    #[test]
400    fn test_clone_shares_state() {
401        let backend = Box::new(MockBackend::new());
402        let handle = SettingsHandle::new(42, backend);
403        let clone = handle.clone();
404        let s1 = handle.snapshot();
405        let s2 = clone.snapshot();
406        // Both Arcs should point to the same allocation.
407        assert!(Arc::ptr_eq(&s1, &s2));
408        // Both Arcs should point to the same value.
409        assert_eq!(*s1, *s2)
410    }
411
412    #[test]
413    fn test_on_change_receives_broadcast_event() {
414        let backend = Box::new(MockBackend::new());
415        let handle = SettingsHandle::new(42, backend);
416
417        let rx = handle.on_change();
418
419        let event = sample_event();
420        handle.broadcast(event.clone());
421
422        let received = rx.recv().unwrap();
423        assert_eq!(received, event)
424    }
425
426    #[test]
427    fn test_multiple_subscribers_all_receive() {
428        let backend = Box::new(MockBackend::new());
429        let handle = SettingsHandle::new(42, backend);
430
431        let rx1 = handle.on_change();
432        let rx2 = handle.on_change();
433
434        let event = sample_event();
435        handle.broadcast(event.clone());
436
437        assert!(rx1.try_recv().is_ok());
438        assert!(rx2.try_recv().is_ok());
439    }
440
441    #[test]
442    fn new_with_stored_uses_provided_last_seen_without_loading_backend() {
443        let load_count = Arc::new(Mutex::new(0));
444        let backend = Box::new(CountingBackend::new(Arc::clone(&load_count)));
445        let mut stored = HashMap::new();
446        stored.insert("theme".to_string(), StoredValue::encode(&"dark").unwrap());
447
448        let _handle = SettingsHandle::new_with_stored(42, backend, stored);
449
450        assert_eq!(*load_count.lock().unwrap(), 0);
451    }
452
453    #[test]
454    fn test_subscriber_is_cleaned_up_on_next_broadcast() {
455        let backend = Box::new(MockBackend::new());
456        let handle = SettingsHandle::new(42, backend);
457
458        {
459            let _rx1 = handle.on_change();
460        }
461        let rx2 = handle.on_change();
462
463        let event = sample_event();
464        handle.broadcast(event.clone());
465
466        assert!(rx2.try_recv().is_ok());
467        assert_eq!(handle.inner.subscribers.lock().unwrap().len(), 1)
468    }
469
470    #[test]
471    fn test_write_field_persists_and_broadcasts() {
472        let mock = MockBackend::new();
473        let backend_data = mock.data();
474        let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
475        let rx = handle.on_change();
476
477        let new_value = StoredValue::encode(&42u32).unwrap();
478        let old_value = Some(StoredValue::encode(&0u32).unwrap());
479
480        handle
481            .write_field("the_value", old_value.clone(), new_value.clone(), |state| {
482                *state = 42u32
483            })
484            .unwrap();
485
486        // 1. In-memory state updated.
487        assert_eq!(*handle.snapshot(), 42);
488
489        // 2. Backend received the write.
490        let stored = backend_data.lock().unwrap();
491        assert_eq!(stored.get("the_value"), Some(&new_value));
492        drop(stored);
493
494        // 3. Subscriber received the right event.
495        let event = rx.try_recv().unwrap();
496        match event {
497            ChangeEvent::Set {
498                key,
499                old_value: old,
500                new_value: new,
501                source,
502                ..
503            } => {
504                assert_eq!(key, "the_value");
505                assert_eq!(old, old_value);
506                assert_eq!(new, new_value);
507                assert_eq!(source, ChangeSource::Local);
508            }
509            other => panic!("expected ChangeEvent::Set, got {:?}", other),
510        }
511    }
512
513    #[test]
514    fn test_external_change_emits_external_event() {
515        let mock = MockBackend::new();
516        let data = mock.data();
517        let commit_signal = mock.commit_signal();
518        let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
519        let rx = handle.on_change();
520
521        // Simulate an external write: insert directly into the backend's storage,
522        // bypassing the handle. Then signal "a commit happened".
523        let new_value = StoredValue::encode(&42u32).unwrap();
524        data.lock()
525            .unwrap()
526            .insert("the_value".to_string(), new_value.clone());
527        commit_signal.send(()).unwrap();
528
529        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
530        match event {
531            ChangeEvent::Set {
532                key,
533                old_value,
534                new_value: new,
535                source,
536                ..
537            } => {
538                assert_eq!(key, "the_value");
539                assert_eq!(old_value, None);
540                assert_eq!(new, new_value);
541                assert_eq!(source, ChangeSource::External);
542            }
543            other => panic!("expected ChangeEvent::Set, got {:?}", other),
544        }
545    }
546
547    #[test]
548    fn test_external_change_updates_snapshot_when_applier_succeeds() {
549        let mock = MockBackend::new();
550        let data = mock.data();
551        let commit_signal = mock.commit_signal();
552        let handle: SettingsHandle<u32> = SettingsHandle::new_with_stored_and_applier(
553            0,
554            Box::new(mock),
555            HashMap::new(),
556            Box::new(|state, key, value| {
557                if key != "the_value" {
558                    return ApplyResult::Ignored;
559                }
560
561                match value.decode::<u32>() {
562                    Ok(decoded) => {
563                        *state = decoded;
564                        ApplyResult::Applied
565                    }
566                    Err(error) => ApplyResult::DeserializeFailure {
567                        raw: value.as_str().to_string(),
568                        error: error.to_string(),
569                    },
570                }
571            }),
572        );
573        let rx = handle.on_change();
574
575        let new_value = StoredValue::encode(&42u32).unwrap();
576        data.lock()
577            .unwrap()
578            .insert("the_value".to_string(), new_value);
579        commit_signal.send(()).unwrap();
580
581        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
582        assert!(matches!(
583            event,
584            ChangeEvent::Set {
585                source: ChangeSource::External,
586                ..
587            }
588        ));
589        assert_eq!(*handle.snapshot(), 42);
590    }
591
592    #[test]
593    fn test_external_change_emits_deserialize_failure_and_preserves_snapshot() {
594        let mock = MockBackend::new();
595        let data = mock.data();
596        let commit_signal = mock.commit_signal();
597        let handle: SettingsHandle<u32> = SettingsHandle::new_with_stored_and_applier(
598            7,
599            Box::new(mock),
600            HashMap::new(),
601            Box::new(|state, key, value| {
602                if key != "the_value" {
603                    return ApplyResult::Ignored;
604                }
605
606                match value.decode::<u32>() {
607                    Ok(decoded) => {
608                        *state = decoded;
609                        ApplyResult::Applied
610                    }
611                    Err(error) => ApplyResult::DeserializeFailure {
612                        raw: value.as_str().to_string(),
613                        error: error.to_string(),
614                    },
615                }
616            }),
617        );
618        let rx = handle.on_change();
619
620        data.lock().unwrap().insert(
621            "the_value".to_string(),
622            StoredValue::from_raw("\"not-a-number\"".to_string()),
623        );
624        commit_signal.send(()).unwrap();
625
626        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
627        match event {
628            ChangeEvent::DeserializeFailure {
629                key, raw, source, ..
630            } => {
631                assert_eq!(key, "the_value");
632                assert_eq!(raw, "\"not-a-number\"");
633                assert_eq!(source, ChangeSource::External);
634            }
635            event => panic!("unexpected event: {event:?}"),
636        }
637        assert_eq!(*handle.snapshot(), 7);
638    }
639
640    #[test]
641    fn test_local_write_does_not_re_emit_as_external() {
642        let mock = MockBackend::new();
643        let commit_signal = mock.commit_signal();
644        let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
645        let rx = handle.on_change();
646
647        let new_value = StoredValue::encode(&42u32).unwrap();
648        handle
649            .write_field("the_value", None, new_value.clone(), |state| *state = 42u32)
650            .unwrap();
651
652        // Drain the Local event from write_field.
653        let first = rx.try_recv().unwrap();
654        match first {
655            ChangeEvent::Set { source, .. } => assert_eq!(source, ChangeSource::Local),
656            other => panic!("expected Local Set, got {:?}", other),
657        }
658
659        // Now signal the diff loop. Since last_seen was updated by write_field,
660        // the diff loop should find no delta and emit nothing.
661        commit_signal.send(()).unwrap();
662
663        // Give the diff loop a moment to run, then assert no further events.
664        let result = rx.recv_timeout(Duration::from_millis(500));
665        assert!(
666            result.is_err(),
667            "expected timeout (no External event), got {:?}",
668            result
669        );
670    }
671}