use std::{future::Future, panic};
use crate::Subscriber;
#[derive(Debug, Clone, Copy)]
pub enum OnChanged {
Continue,
Abort,
}
pub async fn observe_changes<T>(
mut subscriber: Subscriber<T>,
mut on_changed_fn: impl FnMut(&T) -> OnChanged,
) {
while let Ok(next_changed_ref) = subscriber.read_changed().await {
let result =
panic::catch_unwind(panic::AssertUnwindSafe(|| on_changed_fn(&next_changed_ref)));
drop(next_changed_ref);
match result {
Ok(on_changed) => match on_changed {
OnChanged::Continue => {
}
OnChanged::Abort => {
break;
}
},
Err(panicked) => {
panic::resume_unwind(panicked);
}
}
}
}
pub async fn capture_changes<S, T>(
mut subscriber: Subscriber<S>,
initial_value: T,
mut capture_changed_value_fn: impl FnMut(&mut T, &S) -> bool,
mut on_changed_value_fn: impl FnMut(&T) -> OnChanged,
) {
let mut value = initial_value;
loop {
{
let Ok(next_changed_ref) = subscriber.read_changed().await else {
break;
};
match panic::catch_unwind(panic::AssertUnwindSafe(|| {
capture_changed_value_fn(&mut value, &next_changed_ref)
})) {
Ok(true) => (),
Ok(false) => {
continue;
}
Err(panicked) => {
drop(next_changed_ref);
panic::resume_unwind(panicked);
}
}
};
match on_changed_value_fn(&value) {
OnChanged::Continue => {
}
OnChanged::Abort => {
return;
}
}
}
}
#[expect(clippy::manual_async_fn)] pub fn capture_changes_async<S, T, F>(
mut subscriber: Subscriber<S>,
initial_value: T,
mut capture_changed_value_fn: impl FnMut(&mut T, &S) -> bool + Send,
mut on_changed_value_fn: impl FnMut(&T) -> F + Send,
) -> impl Future<Output = ()> + Send
where
S: Send + Sync,
T: Send,
F: Future<Output = OnChanged> + Send,
{
async move {
let capture_changed_value_fn = &mut capture_changed_value_fn;
let on_changed_value_fn = &mut on_changed_value_fn;
let mut value = initial_value;
loop {
{
let Ok(next_changed_ref) = subscriber.read_changed().await else {
break;
};
match panic::catch_unwind(panic::AssertUnwindSafe(|| {
capture_changed_value_fn(&mut value, &next_changed_ref)
})) {
Ok(true) => (),
Ok(false) => {
continue;
}
Err(panicked) => {
drop(next_changed_ref);
panic::resume_unwind(panicked);
}
}
};
match on_changed_value_fn(&value).await {
OnChanged::Continue => {
}
OnChanged::Abort => {
return;
}
}
}
}
}