pub mod aggregate;
pub(crate) mod flip_card;
use crate::flip_card::FlipCard;
use atomic_waker::AtomicWaker;
use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, Waker};
struct ActiveObservation {
id: u64,
notify: AtomicWaker,
}
impl ActiveObservation {
fn notify(&self) {
self.notify.wake();
}
fn register(&self, waker: &Waker) {
self.notify.register(waker);
}
}
impl Debug for ActiveObservation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ActiveObservation(id: {})", self.id)
}
}
#[derive(Debug)]
struct Shared<T> {
next_observer_id: AtomicU64,
value: FlipCard<Option<T>>,
active_observations: treiber_stack::TreiberStack<Weak<ActiveObservation>>,
}
impl<T> Shared<T> {
fn notify(&self) {
for orig in self.active_observations.drain() {
if let Some(active) = orig.upgrade() {
self.active_observations.push_arc(orig);
active.notify();
} else {
}
}
}
}
#[derive(Debug)]
pub struct Value<T: Clone> {
shared: Arc<Shared<T>>,
}
impl<T: Clone> Value<T> {
pub fn new(value: T) -> Self {
Self {
shared: Arc::new(Shared {
value: FlipCard::new(Some(value)),
active_observations: treiber_stack::TreiberStack::default(),
next_observer_id: AtomicU64::new(0),
}),
}
}
pub fn get(&self) -> T
where
T: Clone,
{
self.shared.value.read().expect("Value is hungup")
}
pub fn set(&self, value: T) -> T
where
T: Clone,
{
let old = self.shared.value.flip_to(Some(value));
self.notify();
old.expect("Value is hungup")
}
fn notify(&self) {
self.shared.notify();
}
pub fn observe(&self) -> Observer<T> {
Observer::new(self)
}
}
impl<T: Clone> Drop for Value<T> {
fn drop(&mut self) {
self.shared.value.flip_to(None);
self.notify();
}
}
#[derive(Debug)]
pub struct Observer<T> {
active_observation: Arc<ActiveObservation>,
shared: Arc<Shared<T>>,
observed: Option<T>,
observer_id: u64,
}
impl<T: Clone> Clone for Observer<T> {
fn clone(&self) -> Self {
let observer_id = self.shared.next_observer_id.fetch_add(1, Relaxed);
let active = Arc::new(ActiveObservation {
id: observer_id,
notify: AtomicWaker::new(),
});
self.shared
.active_observations
.push(Arc::downgrade(&active));
Self {
active_observation: active,
shared: self.shared.clone(),
observed: self.observed.clone(),
observer_id,
}
}
}
impl<T> futures_core::Stream for Observer<T>
where
T: PartialEq + Clone + Unpin,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.active_observation.register(cx.waker());
match self.get_mut().next_when_immediately_available() {
Ok(v) => Poll::Ready(v),
Err(_) => Poll::Pending,
}
}
}
impl<T> Observer<T> {
pub fn new(value: &Value<T>) -> Self
where
T: Clone,
{
let observer_id = value.shared.next_observer_id.fetch_add(1, Relaxed);
let active = Arc::new(ActiveObservation {
id: observer_id,
notify: AtomicWaker::new(),
});
value
.shared
.active_observations
.push(Arc::downgrade(&active));
let shared = value.shared.clone();
Self {
shared,
observed: None,
observer_id,
active_observation: active,
}
}
pub fn current_value(&mut self) -> Option<T>
where
T: Clone,
{
let observed = self.shared.value.read();
if let Some(obs) = observed {
self.observed = Some(obs.clone());
Some(obs)
} else {
None
}
}
fn next_when_immediately_available(&mut self) -> Result<Option<T>, ()>
where
T: PartialEq + Clone,
{
let observe = self.shared.value.read();
if let Some(observe) = observe {
if let Some(last) = &self.observed {
if &observe == last {
Err(())
} else {
self.observed = Some(observe.clone());
Ok(Some(observe))
}
} else {
self.observed = Some(observe.clone());
Ok(Some(observe))
}
} else {
Ok(None)
}
}
pub(crate) fn observe_if_distinct(&mut self) -> bool
where
T: PartialEq + Clone,
{
let r = self.next_when_immediately_available();
match r {
Ok(..) => true, Err(_) => false, }
}
pub fn is_dirty(&self) -> bool
where
T: PartialEq + Clone,
{
match &self.shared.value.read() {
Some(value) => {
self.observed.as_ref() != Some(value)
}
None => true, }
}
}
impl<T> Drop for Observer<T> {
fn drop(&mut self) {
let mut extra = Vec::new();
while let Some(orig) = self.shared.active_observations.pop() {
if let Some(active) = orig.upgrade() {
if active.id == self.observer_id {
break;
} else {
extra.push((orig, active));
}
}
}
for (orig, active) in extra {
self.shared.active_observations.push_arc(orig);
active.notify();
}
}
}
impl<T: Clone> Default for Value<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> Display for Value<T>
where
T: Display + Clone,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Value({})", self.get())
}
}
impl<T: Clone> From<T> for Value<T> {
fn from(value: T) -> Self {
Self::new(value)
}
}
impl<T: Clone> From<Value<T>> for Observer<T> {
fn from(value: Value<T>) -> Self {
value.observe()
}
}
impl<T> Display for Observer<T>
where
T: Clone,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Observer(id: {})", self.observer_id)
}
}
impl<T> PartialEq for Value<T>
where
T: PartialEq + Clone,
{
fn eq(&self, other: &Self) -> bool {
self.get() == other.get()
}
}
impl<T> Eq for Value<T> where T: Eq + Clone {}
impl<T> std::hash::Hash for Value<T>
where
T: std::hash::Hash + Clone,
{
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.get().hash(state);
}
}
#[cfg(test)]
mod tests {
use futures_util::StreamExt;
use test_executors::async_test;
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
#[cfg(not(target_arch = "wasm32"))]
use std::thread;
#[cfg(target_arch = "wasm32")]
use wasm_thread as thread;
#[test]
fn test_value() {
let value = super::Value::new(42);
assert_eq!(value.get(), 42);
let old_value = value.set(100);
assert_eq!(old_value, 42);
assert_eq!(value.get(), 100);
}
#[test]
fn test_observer() {
let value = super::Value::new(42);
let mut observer = value.observe();
assert_eq!(observer.current_value().unwrap(), 42);
value.set(100);
assert_eq!(observer.current_value().unwrap(), 100);
}
#[async_test]
async fn test_observer_next() {
let value = super::Value::new(42);
let mut observer = value.observe();
assert_eq!(observer.current_value().unwrap(), 42);
value.set(100);
let next_value = observer.next().await.unwrap();
assert_eq!(next_value, 100);
thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(100));
value.set(200);
std::mem::forget(value); });
let next_value = observer.next().await.unwrap();
assert_eq!(next_value, 200);
}
#[async_test]
async fn drop_value() {
let value = super::Value::new(42);
let mut observer = value.observe();
assert_eq!(observer.current_value().unwrap(), 42);
thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(100));
drop(value);
});
let result = observer.next().await;
assert!(result.is_none());
let result2 = observer.next().await;
assert!(
result2.is_none(),
"Expected error after value drop, got: {:?}",
result2
);
}
#[test]
fn test_observer_clone_drop_loop() {
let value = super::Value::new(42);
let observer = value.observe();
for _ in 0..300 {
let clone = observer.clone();
drop(clone);
}
}
#[test]
fn test_value_partialeq() {
let value1 = super::Value::new(42);
let value2 = super::Value::new(42);
let value3 = super::Value::new(100);
assert_eq!(value1, value2);
assert_ne!(value1, value3);
value2.set(100);
assert_eq!(value2, value3);
assert_ne!(value1, value2);
}
#[test]
fn test_value_hash() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let value1 = super::Value::new(42);
let value2 = super::Value::new(42);
let value3 = super::Value::new(100);
let mut hasher1 = DefaultHasher::new();
value1.hash(&mut hasher1);
let hash1 = hasher1.finish();
let mut hasher2 = DefaultHasher::new();
value2.hash(&mut hasher2);
let hash2 = hasher2.finish();
let mut hasher3 = DefaultHasher::new();
value3.hash(&mut hasher3);
let hash3 = hasher3.finish();
assert_eq!(hash1, hash2, "Equal values should have equal hashes");
assert_ne!(
hash1, hash3,
"Different values should have different hashes"
);
value2.set(100);
let mut hasher4 = DefaultHasher::new();
value2.hash(&mut hasher4);
let hash4 = hasher4.finish();
assert_eq!(
hash3, hash4,
"Value with same content should have same hash"
);
assert_ne!(
hash1, hash4,
"Value after update should have different hash"
);
}
#[test]
fn test_observer_display() {
let value = super::Value::new(42);
let observer = value.observe();
let display_str = format!("{}", observer);
assert!(display_str.starts_with("Observer(id:"));
}
}