mod active_observation;
pub mod aggregate;
use crate::active_observation::ActiveObservation;
use std::fmt::Display;
use std::sync::{Arc, Mutex, MutexGuard};
#[derive(Debug)]
struct Shared<T> {
value: Option<T>, active_observations: Vec<ActiveObservation>,
}
#[derive(Debug)]
pub struct Value<T> {
shared: Arc<Mutex<Shared<T>>>,
}
impl<T> Value<T> {
pub fn new(value: T) -> Self {
Self {
shared: Arc::new(Mutex::new(Shared {
value: Some(value),
active_observations: Vec::new(),
})),
}
}
pub fn get(&self) -> T
where
T: Clone,
{
self.shared
.lock()
.unwrap()
.value
.clone()
.expect("Value is hungup")
}
pub fn set(&self, value: T) -> T {
let mut lock = self.shared.lock().unwrap();
let old = lock.value.replace(value);
let observers = std::mem::take(&mut lock.active_observations);
drop(lock); Self::notify(observers);
old.expect("Value is hungup")
}
fn notify(who: Vec<ActiveObservation>) {
drop(who);
}
pub fn observe(&self) -> Observer<T> {
Observer::new(self)
}
}
impl<T> Drop for Value<T> {
fn drop(&mut self) {
let mut lock = self.shared.lock().unwrap();
lock.value = None;
let observers = std::mem::take(&mut lock.active_observations);
drop(lock); drop(observers); }
}
#[derive(Debug)]
#[non_exhaustive]
pub enum ObserverError {
Hungup,
}
#[derive(Debug, Clone)]
pub struct Observer<T> {
shared: Arc<Mutex<Shared<T>>>,
observed: Option<T>,
}
impl<T> Observer<T> {
pub fn new(value: &Value<T>) -> Self {
let shared = value.shared.clone();
Self {
shared,
observed: None,
}
}
pub fn current_value(&mut self) -> Result<T, ObserverError>
where
T: Clone,
{
let observed = self.shared.lock().unwrap().value.clone();
if let Some(obs) = observed {
self.observed = Some(obs.clone());
Ok(obs)
} else {
Err(ObserverError::Hungup)
}
}
pub async fn next(&mut self) -> Result<T, ObserverError>
where
T: Clone + PartialEq,
{
let mut r = self.next_when_immediately_available();
let future = match r {
Ok(Ok(value)) => return Ok(value),
Ok(Err(ObserverError::Hungup)) => return Err(ObserverError::Hungup),
Err(ref mut lock) => {
let (observation, future) = active_observation::observation();
lock.active_observations.push(observation);
future
}
};
drop(r);
let r = future.await;
if let Err(e) = r {
Err(e)
} else {
self.current_value()
}
}
fn next_when_immediately_available(
&mut self,
) -> Result<Result<T, ObserverError>, MutexGuard<Shared<T>>>
where
T: PartialEq + Clone,
{
if let Some(last_observed) = &self.observed {
let lock = self.shared.lock().unwrap();
if (lock.value.as_ref()) != Some(last_observed) {
let new_value = lock.value.clone();
if let Some(new_value) = new_value {
self.observed = Some(new_value.clone());
Ok(Ok(new_value))
} else {
Ok(Err(ObserverError::Hungup))
}
} else {
Err(lock) }
} else {
Ok(self.current_value())
}
}
pub(crate) fn aggregate_poll_impl(
&mut self,
observer: ActiveObservation,
) -> Result<(ActiveObservation, Result<T, ObserverError>), ()>
where
T: PartialEq + Clone,
{
match self.next_when_immediately_available() {
Ok(answer) => Ok((observer, answer)),
Err(mut lock) => {
lock.active_observations.push(observer);
Err(())
}
}
}
pub(crate) fn observe_if_distinct(&mut self) -> bool
where
T: PartialEq + Clone,
{
let r = self.next_when_immediately_available();
match r {
Ok(..) => true, Err(_) => false, }
}
pub fn is_dirty(&self) -> bool
where
T: PartialEq,
{
let lock = self.shared.lock().unwrap();
match &lock.value {
Some(value) => {
self.observed.as_ref() != Some(value)
}
None => true, }
}
}
impl<T> Default for Value<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> Display for Value<T>
where
T: Display + Clone,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Value({})", self.get())
}
}
impl<T> From<T> for Value<T> {
fn from(value: T) -> Self {
Self::new(value)
}
}
impl Display for ObserverError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ObserverError::Hungup => write!(f, "Observer hung up"),
}
}
}
impl std::error::Error for ObserverError {}
impl<T> From<Value<T>> for Observer<T> {
fn from(value: Value<T>) -> Self {
value.observe()
}
}
#[cfg(test)]
mod tests {
use test_executors::async_test;
#[test]
fn test_value() {
let value = super::Value::new(42);
assert_eq!(value.get(), 42);
let old_value = value.set(100);
assert_eq!(old_value, 42);
assert_eq!(value.get(), 100);
}
#[test]
fn test_observer() {
let value = super::Value::new(42);
let mut observer = value.observe();
assert_eq!(observer.current_value().unwrap(), 42);
value.set(100);
assert_eq!(observer.current_value().unwrap(), 100);
}
#[async_test]
async fn test_observer_next() {
let value = super::Value::new(42);
let mut observer = value.observe();
assert_eq!(observer.current_value().unwrap(), 42);
value.set(100);
let next_value = observer.next().await.unwrap();
assert_eq!(next_value, 100);
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(100));
value.set(200);
std::mem::forget(value); });
let next_value = observer.next().await.unwrap();
assert_eq!(next_value, 200);
}
#[async_test]
async fn drop_value() {
let value = super::Value::new(42);
let mut observer = value.observe();
assert_eq!(observer.current_value().unwrap(), 42);
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(100));
drop(value);
});
let result = observer.next().await;
assert!(result.is_err());
let result2 = observer.next().await;
assert!(
result2.is_err(),
"Expected error after value drop, got: {:?}",
result2
);
}
}