use arc_swap::ArcSwap;
use crossbeam_channel::{Receiver, Sender, select, unbounded};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::thread::{self, JoinHandle};
use std::time::SystemTime;
use crate::{ChangeEvent, ChangeSource, SettingsError, StorageBackend, StoredValue};
pub type ExternalApplier<T> =
Box<dyn Fn(&mut T, &str, &StoredValue) -> ApplyResult + Send + Sync + 'static>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ApplyResult {
Applied,
AppliedWithValue { value: StoredValue },
Ignored,
DeserializeFailure { raw: String, error: String },
}
pub struct SettingsHandle<T> {
inner: Arc<SettingsInner<T>>,
}
struct SettingsInner<T> {
current: ArcSwap<T>,
write_lock: Mutex<()>,
backend: Mutex<Box<dyn StorageBackend>>,
external_applier: ExternalApplier<T>,
subscribers: Mutex<Vec<Sender<ChangeEvent>>>,
last_seen: Mutex<HashMap<String, StoredValue>>,
diff_shutdown_tx: Sender<()>,
diff_thread: Option<JoinHandle<()>>,
}
impl<T> SettingsHandle<T>
where
T: Clone + Send + Sync + 'static,
{
pub fn new(initial: T, backend: Box<dyn StorageBackend>) -> Self {
let initial_last_seen = backend.load_all().unwrap_or_default();
Self::new_with_stored(initial, backend, initial_last_seen)
}
pub fn new_with_stored(
initial: T,
backend: Box<dyn StorageBackend>,
stored: HashMap<String, StoredValue>,
) -> Self {
Self::new_with_stored_and_applier(initial, backend, stored, noop_external_applier())
}
pub fn new_with_stored_and_applier(
initial: T,
backend: Box<dyn StorageBackend>,
stored: HashMap<String, StoredValue>,
external_applier: ExternalApplier<T>,
) -> Self {
let (diff_shutdown_tx, diff_shutdown_rx) = unbounded::<()>();
let commits_rx = backend.watch_changes();
let inner = Arc::new_cyclic(move |weak: &Weak<SettingsInner<T>>| {
let diff_thread = if let Some(commits_rx) = commits_rx {
let weak = weak.clone();
Some(thread::spawn(move || {
diff_loop(weak, commits_rx, diff_shutdown_rx);
}))
} else {
None
};
SettingsInner {
current: ArcSwap::from_pointee(initial),
write_lock: Mutex::new(()),
backend: Mutex::new(backend),
external_applier,
subscribers: Mutex::new(Vec::new()),
last_seen: Mutex::new(stored),
diff_shutdown_tx,
diff_thread,
}
});
Self { inner }
}
pub fn snapshot(&self) -> Arc<T> {
self.inner.current.load_full()
}
pub fn on_change(&self) -> Receiver<ChangeEvent> {
let (tx, rx) = unbounded();
self.inner.subscribers.lock().unwrap().push(tx);
rx
}
fn broadcast(&self, event: ChangeEvent) {
inner_broadcast(&self.inner, event);
}
pub fn write_field(
&self,
key: &str,
old_value: Option<StoredValue>,
new_value: StoredValue,
mutator: impl FnOnce(&mut T),
) -> Result<(), SettingsError> {
let _writer = self.inner.write_lock.lock().unwrap();
let backend = self.inner.backend.lock().unwrap();
backend.set(key, &new_value)?;
self.inner
.last_seen
.lock()
.unwrap()
.insert(key.to_string(), new_value.clone());
let prev = self.inner.current.load_full();
let mut next = (*prev).clone();
mutator(&mut next);
self.inner.current.store(Arc::new(next));
drop(backend);
drop(_writer);
self.broadcast(ChangeEvent::Set {
key: key.into(),
old_value,
new_value,
source: ChangeSource::Local,
timestamp: SystemTime::now(),
});
Ok(())
}
}
fn noop_external_applier<T>() -> ExternalApplier<T> {
Box::new(|_, _, _| ApplyResult::Ignored)
}
impl<T> Clone for SettingsHandle<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> Drop for SettingsInner<T> {
fn drop(&mut self) {
let _ = self.diff_shutdown_tx.send(());
if let Some(handle) = self.diff_thread.take() {
let _ = handle.join();
}
}
}
fn inner_broadcast<T>(inner: &SettingsInner<T>, event: ChangeEvent) {
let mut subs = inner.subscribers.lock().unwrap();
subs.retain(|tx| tx.send(event.clone()).is_ok());
}
fn diff_loop<T>(weak: Weak<SettingsInner<T>>, commits_rx: Receiver<()>, shutdown_rx: Receiver<()>)
where
T: Clone + Send + Sync + 'static,
{
loop {
select! {
recv(shutdown_rx) -> _ => return,
recv(commits_rx) -> msg => {
if msg.is_err() {
return;
}
let Some(inner) = weak.upgrade() else { return };
let _writer = inner.write_lock.lock().unwrap();
let fresh = {
let backend = inner.backend.lock().unwrap();
match backend.load_all() {
Ok(map) => map,
Err(_) => continue,
}
};
let mut last_seen = inner.last_seen.lock().unwrap();
let current = inner.current.load_full();
let mut next = (*current).clone();
let mut should_store_next = false;
let mut events = Vec::new();
for (key, new_value) in &fresh {
let old_value = last_seen.get(key).cloned();
if old_value.as_ref() != Some(new_value) {
match (inner.external_applier)(&mut next, key, new_value) {
ApplyResult::Applied => {
should_store_next = true;
events.push(ChangeEvent::Set {
key: key.clone(),
old_value,
new_value: new_value.clone(),
source: ChangeSource::External,
timestamp: SystemTime::now(),
});
}
ApplyResult::AppliedWithValue { value } => {
should_store_next = true;
events.push(ChangeEvent::Set {
key: key.clone(),
old_value,
new_value: value,
source: ChangeSource::External,
timestamp: SystemTime::now(),
});
}
ApplyResult::Ignored => {
events.push(ChangeEvent::Set {
key: key.clone(),
old_value,
new_value: new_value.clone(),
source: ChangeSource::External,
timestamp: SystemTime::now(),
});
}
ApplyResult::DeserializeFailure { raw, error } => {
events.push(ChangeEvent::DeserializeFailure {
key: key.clone(),
raw,
error,
source: ChangeSource::External,
timestamp: SystemTime::now(),
});
}
}
}
}
for (key, old_value) in last_seen.iter() {
if !fresh.contains_key(key) {
events.push(ChangeEvent::Deleted {
key: key.clone(),
old_value: old_value.clone(),
source: ChangeSource::External,
timestamp: SystemTime::now(),
});
}
}
if should_store_next {
inner.current.store(Arc::new(next));
}
*last_seen = fresh;
drop(last_seen);
drop(_writer);
for event in events {
inner_broadcast(&inner, event);
}
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::time::{Duration, SystemTime};
use crate::{BackendError, ChangeSource, StoredValue};
struct MockBackend {
data: Arc<Mutex<HashMap<String, StoredValue>>>,
commits_tx: Sender<()>,
commits_rx: Receiver<()>,
}
struct CountingBackend {
load_count: Arc<Mutex<usize>>,
}
impl MockBackend {
fn new() -> Self {
let (commits_tx, commits_rx) = unbounded();
Self {
data: Arc::new(Mutex::new(HashMap::new())),
commits_tx,
commits_rx,
}
}
fn data(&self) -> Arc<Mutex<HashMap<String, StoredValue>>> {
Arc::clone(&self.data)
}
fn commit_signal(&self) -> Sender<()> {
self.commits_tx.clone()
}
}
impl StorageBackend for MockBackend {
fn load_all(&self) -> Result<HashMap<String, StoredValue>, BackendError> {
Ok(self.data.lock().unwrap().clone())
}
fn set(&self, key: &str, value: &StoredValue) -> Result<(), BackendError> {
self.data
.lock()
.unwrap()
.insert(key.to_string(), value.clone());
Ok(())
}
fn delete(&self, key: &str) -> Result<(), BackendError> {
self.data.lock().unwrap().remove(key);
Ok(())
}
fn watch_changes(&self) -> Option<Receiver<()>> {
Some(self.commits_rx.clone())
}
}
impl CountingBackend {
fn new(load_count: Arc<Mutex<usize>>) -> Self {
Self { load_count }
}
}
impl StorageBackend for CountingBackend {
fn load_all(&self) -> Result<HashMap<String, StoredValue>, BackendError> {
*self.load_count.lock().unwrap() += 1;
Ok(HashMap::new())
}
fn set(&self, _key: &str, _value: &StoredValue) -> Result<(), BackendError> {
Ok(())
}
fn delete(&self, _key: &str) -> Result<(), BackendError> {
Ok(())
}
fn watch_changes(&self) -> Option<Receiver<()>> {
None
}
}
fn sample_event() -> ChangeEvent {
ChangeEvent::Set {
key: "theme".into(),
old_value: None,
new_value: StoredValue::encode(&"dark").unwrap(),
source: ChangeSource::Local,
timestamp: SystemTime::now(),
}
}
#[test]
fn test_snapshot_returns_initial_value() {
let backend = Box::new(MockBackend::new());
let handle = SettingsHandle::new(42, backend);
let snap = handle.snapshot();
assert_eq!(*snap, 42)
}
#[test]
fn test_clone_shares_state() {
let backend = Box::new(MockBackend::new());
let handle = SettingsHandle::new(42, backend);
let clone = handle.clone();
let s1 = handle.snapshot();
let s2 = clone.snapshot();
assert!(Arc::ptr_eq(&s1, &s2));
assert_eq!(*s1, *s2)
}
#[test]
fn test_on_change_receives_broadcast_event() {
let backend = Box::new(MockBackend::new());
let handle = SettingsHandle::new(42, backend);
let rx = handle.on_change();
let event = sample_event();
handle.broadcast(event.clone());
let received = rx.recv().unwrap();
assert_eq!(received, event)
}
#[test]
fn test_multiple_subscribers_all_receive() {
let backend = Box::new(MockBackend::new());
let handle = SettingsHandle::new(42, backend);
let rx1 = handle.on_change();
let rx2 = handle.on_change();
let event = sample_event();
handle.broadcast(event.clone());
assert!(rx1.try_recv().is_ok());
assert!(rx2.try_recv().is_ok());
}
#[test]
fn new_with_stored_uses_provided_last_seen_without_loading_backend() {
let load_count = Arc::new(Mutex::new(0));
let backend = Box::new(CountingBackend::new(Arc::clone(&load_count)));
let mut stored = HashMap::new();
stored.insert("theme".to_string(), StoredValue::encode(&"dark").unwrap());
let _handle = SettingsHandle::new_with_stored(42, backend, stored);
assert_eq!(*load_count.lock().unwrap(), 0);
}
#[test]
fn test_subscriber_is_cleaned_up_on_next_broadcast() {
let backend = Box::new(MockBackend::new());
let handle = SettingsHandle::new(42, backend);
{
let _rx1 = handle.on_change();
}
let rx2 = handle.on_change();
let event = sample_event();
handle.broadcast(event.clone());
assert!(rx2.try_recv().is_ok());
assert_eq!(handle.inner.subscribers.lock().unwrap().len(), 1)
}
#[test]
fn test_write_field_persists_and_broadcasts() {
let mock = MockBackend::new();
let backend_data = mock.data();
let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
let rx = handle.on_change();
let new_value = StoredValue::encode(&42u32).unwrap();
let old_value = Some(StoredValue::encode(&0u32).unwrap());
handle
.write_field("the_value", old_value.clone(), new_value.clone(), |state| {
*state = 42u32
})
.unwrap();
assert_eq!(*handle.snapshot(), 42);
let stored = backend_data.lock().unwrap();
assert_eq!(stored.get("the_value"), Some(&new_value));
drop(stored);
let event = rx.try_recv().unwrap();
match event {
ChangeEvent::Set {
key,
old_value: old,
new_value: new,
source,
..
} => {
assert_eq!(key, "the_value");
assert_eq!(old, old_value);
assert_eq!(new, new_value);
assert_eq!(source, ChangeSource::Local);
}
other => panic!("expected ChangeEvent::Set, got {:?}", other),
}
}
#[test]
fn test_external_change_emits_external_event() {
let mock = MockBackend::new();
let data = mock.data();
let commit_signal = mock.commit_signal();
let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
let rx = handle.on_change();
let new_value = StoredValue::encode(&42u32).unwrap();
data.lock()
.unwrap()
.insert("the_value".to_string(), new_value.clone());
commit_signal.send(()).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
match event {
ChangeEvent::Set {
key,
old_value,
new_value: new,
source,
..
} => {
assert_eq!(key, "the_value");
assert_eq!(old_value, None);
assert_eq!(new, new_value);
assert_eq!(source, ChangeSource::External);
}
other => panic!("expected ChangeEvent::Set, got {:?}", other),
}
}
#[test]
fn test_external_change_updates_snapshot_when_applier_succeeds() {
let mock = MockBackend::new();
let data = mock.data();
let commit_signal = mock.commit_signal();
let handle: SettingsHandle<u32> = SettingsHandle::new_with_stored_and_applier(
0,
Box::new(mock),
HashMap::new(),
Box::new(|state, key, value| {
if key != "the_value" {
return ApplyResult::Ignored;
}
match value.decode::<u32>() {
Ok(decoded) => {
*state = decoded;
ApplyResult::Applied
}
Err(error) => ApplyResult::DeserializeFailure {
raw: value.as_str().to_string(),
error: error.to_string(),
},
}
}),
);
let rx = handle.on_change();
let new_value = StoredValue::encode(&42u32).unwrap();
data.lock()
.unwrap()
.insert("the_value".to_string(), new_value);
commit_signal.send(()).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert!(matches!(
event,
ChangeEvent::Set {
source: ChangeSource::External,
..
}
));
assert_eq!(*handle.snapshot(), 42);
}
#[test]
fn test_external_change_emits_deserialize_failure_and_preserves_snapshot() {
let mock = MockBackend::new();
let data = mock.data();
let commit_signal = mock.commit_signal();
let handle: SettingsHandle<u32> = SettingsHandle::new_with_stored_and_applier(
7,
Box::new(mock),
HashMap::new(),
Box::new(|state, key, value| {
if key != "the_value" {
return ApplyResult::Ignored;
}
match value.decode::<u32>() {
Ok(decoded) => {
*state = decoded;
ApplyResult::Applied
}
Err(error) => ApplyResult::DeserializeFailure {
raw: value.as_str().to_string(),
error: error.to_string(),
},
}
}),
);
let rx = handle.on_change();
data.lock().unwrap().insert(
"the_value".to_string(),
StoredValue::from_raw("\"not-a-number\"".to_string()),
);
commit_signal.send(()).unwrap();
let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
match event {
ChangeEvent::DeserializeFailure {
key, raw, source, ..
} => {
assert_eq!(key, "the_value");
assert_eq!(raw, "\"not-a-number\"");
assert_eq!(source, ChangeSource::External);
}
event => panic!("unexpected event: {event:?}"),
}
assert_eq!(*handle.snapshot(), 7);
}
#[test]
fn test_local_write_does_not_re_emit_as_external() {
let mock = MockBackend::new();
let commit_signal = mock.commit_signal();
let handle: SettingsHandle<u32> = SettingsHandle::new(0, Box::new(mock));
let rx = handle.on_change();
let new_value = StoredValue::encode(&42u32).unwrap();
handle
.write_field("the_value", None, new_value.clone(), |state| *state = 42u32)
.unwrap();
let first = rx.try_recv().unwrap();
match first {
ChangeEvent::Set { source, .. } => assert_eq!(source, ChangeSource::Local),
other => panic!("expected Local Set, got {:?}", other),
}
commit_signal.send(()).unwrap();
let result = rx.recv_timeout(Duration::from_millis(500));
assert!(
result.is_err(),
"expected timeout (no External event), got {:?}",
result
);
}
}