use std::collections::HashMap;
use std::fmt;
use std::panic;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::RwLock;
use thiserror::Error;
use tokio::task::{self, JoinError};
#[derive(Error, Debug, Clone)]
pub enum PropertyError {
#[error("Failed to acquire read lock during '{operation}': {context}")]
ReadLockError {
operation: String,
context: String,
timestamp_ms: u64,
},
#[error("Failed to acquire write lock during '{operation}': {context}")]
WriteLockError {
operation: String,
context: String,
timestamp_ms: u64,
},
#[error("Observer with ID {id} not found")]
ObserverNotFound {
id: ObserverId,
},
#[error("Lock poisoned during '{operation}': {context}")]
LockPoisoned {
operation: String,
context: String,
timestamp_ms: u64,
},
#[error("Observer {observer_id} panicked: {error}")]
ObserverPanic {
observer_id: ObserverId,
error: String,
timestamp_ms: u64,
},
#[error("Observer execution failed: {reason}")]
ObserverError {
reason: String,
},
#[error("Tokio runtime error: {reason}")]
TokioError {
reason: String,
},
#[error("Task join error: {0}")]
JoinError(String),
#[error("Capacity exceeded: current={current}, max={max}, resource={resource}")]
CapacityExceeded {
current: usize,
max: usize,
resource: String,
},
#[error("Operation '{operation}' timed out: {elapsed_ms}ms > {threshold_ms}ms")]
OperationTimeout {
operation: String,
elapsed_ms: u64,
threshold_ms: u64,
},
#[error("Property is shutting down")]
ShutdownInProgress,
}
impl PropertyError {
pub fn diagnostic_info(&self) -> String {
match self {
Self::ReadLockError { operation, context, timestamp_ms } => {
format!(
"READ_LOCK_ERROR | operation={} | context={} | timestamp_ms={}",
operation, context, timestamp_ms
)
}
Self::WriteLockError { operation, context, timestamp_ms } => {
format!(
"WRITE_LOCK_ERROR | operation={} | context={} | timestamp_ms={}",
operation, context, timestamp_ms
)
}
Self::LockPoisoned { operation, context, timestamp_ms } => {
format!(
"LOCK_POISONED | operation={} | context={} | timestamp_ms={}",
operation, context, timestamp_ms
)
}
Self::ObserverPanic { observer_id, error, timestamp_ms } => {
format!(
"OBSERVER_PANIC | observer_id={} | error={} | timestamp_ms={}",
observer_id, error, timestamp_ms
)
}
Self::ObserverNotFound { id } => {
format!("OBSERVER_NOT_FOUND | id={}", id)
}
Self::CapacityExceeded { current, max, resource } => {
format!(
"CAPACITY_EXCEEDED | resource={} | current={} | max={} | utilization={:.1}%",
resource, current, max, (*current as f64 / *max as f64) * 100.0
)
}
Self::OperationTimeout { operation, elapsed_ms, threshold_ms } => {
format!(
"OPERATION_TIMEOUT | operation={} | elapsed_ms={} | threshold_ms={} | overage_ms={}",
operation, elapsed_ms, threshold_ms, elapsed_ms.saturating_sub(*threshold_ms)
)
}
Self::ShutdownInProgress => {
"SHUTDOWN_IN_PROGRESS | property is shutting down".to_string()
}
Self::ObserverError { reason } => {
format!("OBSERVER_ERROR | reason={}", reason)
}
Self::TokioError { reason } => {
format!("TOKIO_ERROR | reason={}", reason)
}
Self::JoinError(msg) => {
format!("JOIN_ERROR | message={}", msg)
}
}
}
fn current_timestamp_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub fn read_lock_error(operation: impl Into<String>, context: impl Into<String>) -> Self {
Self::ReadLockError {
operation: operation.into(),
context: context.into(),
timestamp_ms: Self::current_timestamp_ms(),
}
}
pub fn write_lock_error(operation: impl Into<String>, context: impl Into<String>) -> Self {
Self::WriteLockError {
operation: operation.into(),
context: context.into(),
timestamp_ms: Self::current_timestamp_ms(),
}
}
pub fn lock_poisoned(operation: impl Into<String>, context: impl Into<String>) -> Self {
Self::LockPoisoned {
operation: operation.into(),
context: context.into(),
timestamp_ms: Self::current_timestamp_ms(),
}
}
pub fn observer_panic(observer_id: ObserverId, error: impl Into<String>) -> Self {
Self::ObserverPanic {
observer_id,
error: error.into(),
timestamp_ms: Self::current_timestamp_ms(),
}
}
}
#[derive(Debug, Clone)]
pub struct PropertyConfig {
pub max_observers: usize,
pub max_pending_notifications: usize,
pub observer_timeout_ms: u64,
pub max_concurrent_async_tasks: usize,
}
impl Default for PropertyConfig {
fn default() -> Self {
Self {
max_observers: 1000,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 100,
}
}
}
#[derive(Debug, Clone)]
pub struct ShutdownReport {
pub observers_cleared: usize,
pub shutdown_duration: std::time::Duration,
pub completed_within_timeout: bool,
pub initiated_at_ms: u64,
}
impl ShutdownReport {
pub fn diagnostic_info(&self) -> String {
format!(
"SHUTDOWN_COMPLETE | observers_cleared={} | duration_ms={} | within_timeout={} | initiated_at_ms={}",
self.observers_cleared,
self.shutdown_duration.as_millis(),
self.completed_within_timeout,
self.initiated_at_ms
)
}
}
pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ObserverId(pub(crate) usize);
impl From<ObserverId> for usize {
fn from(id: ObserverId) -> Self {
id.0
}
}
impl From<usize> for ObserverId {
fn from(value: usize) -> Self {
ObserverId(value)
}
}
impl fmt::Display for ObserverId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct ObservableProperty<T> {
inner: Arc<RwLock<InnerProperty<T>>>,
config: PropertyConfig,
async_task_semaphore: Arc<tokio::sync::Semaphore>,
}
struct InnerProperty<T> {
value: T,
observers: HashMap<ObserverId, Observer<T>>,
next_id: usize,
}
pub struct PropertyHandle<T: Clone + Send + Sync + 'static> {
inner: Arc<RwLock<InnerProperty<T>>>,
}
impl<T: Clone + Send + Sync + 'static> PropertyHandle<T> {
pub fn try_unsubscribe(&self, id: ObserverId) -> bool {
let mut inner = self.inner.write();
inner.observers.remove(&id).is_some()
}
}
pub struct Subscription<T: Clone + Send + Sync + 'static> {
inner: Arc<RwLock<InnerProperty<T>>>,
id: ObserverId,
}
impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
fn drop(&mut self) {
let mut inner = self.inner.write();
inner.observers.remove(&self.id);
}
}
impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
pub fn new(initial_value: T) -> Self {
Self::new_with_config(initial_value, PropertyConfig::default())
}
pub fn new_with_config(initial_value: T, config: PropertyConfig) -> Self {
let semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_concurrent_async_tasks));
Self {
inner: Arc::new(RwLock::new(InnerProperty {
value: initial_value,
observers: HashMap::new(),
next_id: 0,
})),
config,
async_task_semaphore: semaphore,
}
}
pub fn get(&self) -> Result<T, PropertyError> {
Ok(self.inner.read().value.clone())
}
pub fn get_ref(&self) -> impl std::ops::Deref<Target = T> + '_ {
parking_lot::RwLockReadGuard::map(self.inner.read(), |inner| &inner.value)
}
pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
let (old_value, observers_snapshot) = {
let mut inner = self.inner.write();
let old_value = inner.value.clone();
inner.value = new_value.clone();
let observers_snapshot: Vec<Observer<T>> = inner.observers.values().cloned().collect();
(old_value, observers_snapshot)
};
for observer in observers_snapshot {
if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
observer(&old_value, &new_value);
})) {
eprintln!("Observer panic: {:?}", e);
}
}
Ok(())
}
pub async fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
let (old_value, observers_snapshot) = {
let mut inner = self.inner.write();
let old_value = inner.value.clone();
inner.value = new_value.clone();
let observers_snapshot: Vec<Observer<T>> = inner.observers.values().cloned().collect();
(old_value, observers_snapshot)
};
if observers_snapshot.is_empty() {
return Ok(());
}
let mut tasks = Vec::with_capacity(observers_snapshot.len());
for observer in observers_snapshot {
let old_val = old_value.clone();
let new_val = new_value.clone();
let semaphore = Arc::clone(&self.async_task_semaphore);
let task = task::spawn(async move {
let _permit = semaphore.acquire().await.expect("Semaphore closed");
if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
observer(&old_val, &new_val);
})) {
eprintln!("Observer panic in task: {:?}", e);
}
});
tasks.push(task);
}
for task in tasks {
task.await.map_err(|e| PropertyError::JoinError(format!("Task join error: {}", e)))?;
}
Ok(())
}
pub fn update<F>(&self, update_fn: F) -> Result<(), PropertyError>
where
F: FnOnce(T) -> T,
{
let new_value = update_fn(self.get()?);
self.set(new_value)
}
pub async fn update_async<F>(&self, update_fn: F) -> Result<(), PropertyError>
where
F: FnOnce(T) -> T,
{
let new_value = update_fn(self.get()?);
self.set_async(new_value).await
}
pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
let mut inner = self.inner.write();
if inner.observers.len() >= self.config.max_observers {
return Err(PropertyError::CapacityExceeded {
current: inner.observers.len(),
max: self.config.max_observers,
resource: "observers".to_string(),
});
}
let id = ObserverId(inner.next_id);
inner.next_id += 1;
inner.observers.insert(id, observer);
Ok(id)
}
pub fn unsubscribe(&self, id: ObserverId) -> Result<(), PropertyError> {
let mut inner = self.inner.write();
if inner.observers.remove(&id).is_some() {
Ok(())
} else {
Err(PropertyError::ObserverNotFound { id })
}
}
pub fn try_unsubscribe(&self, id: ObserverId) -> bool {
let mut inner = self.inner.write();
inner.observers.remove(&id).is_some()
}
pub fn observer_count(&self) -> usize {
self.inner.read().observers.len()
}
pub fn subscribe_filtered<F>(
&self,
observer: Observer<T>,
filter: F,
) -> Result<ObserverId, PropertyError>
where
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
let filter = Arc::new(filter);
let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
if filter(old_val, new_val) {
observer(old_val, new_val);
}
});
self.subscribe(filtered_observer)
}
pub fn subscribe_async<F, Fut>(&self, handler: F) -> Result<ObserverId, PropertyError>
where
F: Fn(T, T) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let handler = Arc::new(handler);
let semaphore = Arc::clone(&self.async_task_semaphore);
let observer = Arc::new(move |old: &T, new: &T| {
let old_val = old.clone();
let new_val = new.clone();
let handler_clone = Arc::clone(&handler);
let semaphore_clone = Arc::clone(&semaphore);
tokio::spawn(async move {
let _permit = semaphore_clone.acquire().await.expect("Semaphore closed");
handler_clone(old_val, new_val).await;
});
});
self.subscribe(observer)
}
pub fn subscribe_async_filtered<F, Fut, Filt>(
&self,
handler: F,
filter: Filt,
) -> Result<ObserverId, PropertyError>
where
F: Fn(T, T) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
Filt: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
let filter = Arc::new(filter);
let handler = Arc::new(handler);
let semaphore = Arc::clone(&self.async_task_semaphore);
let observer = Arc::new(move |old: &T, new: &T| {
if filter(old, new) {
let old_val = old.clone();
let new_val = new.clone();
let handler_clone = Arc::clone(&handler);
let semaphore_clone = Arc::clone(&semaphore);
tokio::spawn(async move {
let _permit = semaphore_clone.acquire().await.expect("Semaphore closed");
handler_clone(old_val, new_val).await;
});
}
});
self.subscribe(observer)
}
pub fn map<U, F>(&self, transform: F) -> Result<ObservableProperty<U>, PropertyError>
where
U: Clone + Send + Sync + 'static,
F: Fn(&T) -> U + Send + Sync + 'static,
{
let transform = Arc::new(transform);
let initial_value = transform(&self.get()?);
let derived = ObservableProperty::new(initial_value);
let derived_clone = derived.clone();
self.subscribe(Arc::new(move |_, new_value| {
let transformed = transform(new_value);
if let Err(e) = derived_clone.set(transformed) {
eprintln!("Failed to update derived property: {}", e);
}
}))?;
Ok(derived)
}
pub fn clear_observers(&self) -> Result<(), PropertyError> {
let mut inner = self.inner.write();
inner.observers.clear();
Ok(())
}
pub fn shutdown(&self) -> Result<(), PropertyError> {
self.clear_observers()
}
pub async fn shutdown_with_timeout(
&self,
timeout: std::time::Duration,
) -> Result<ShutdownReport, PropertyError> {
use std::time::{SystemTime, UNIX_EPOCH, Instant};
let start = Instant::now();
let initiated_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let initial_count = self.observer_count();
self.clear_observers()?;
let grace_period = timeout.min(std::time::Duration::from_millis(500));
let completed_within_timeout = tokio::time::timeout(
grace_period,
tokio::time::sleep(grace_period)
).await.is_ok();
let shutdown_duration = start.elapsed();
Ok(ShutdownReport {
observers_cleared: initial_count,
shutdown_duration,
completed_within_timeout,
initiated_at_ms,
})
}
pub fn subscribe_with_token(&self, observer: Observer<T>) -> Result<Subscription<T>, PropertyError> {
let id = self.subscribe(observer)?;
Ok(Subscription {
inner: Arc::clone(&self.inner),
id
})
}
pub fn subscribe_filtered_with_token<F>(
&self,
observer: Observer<T>,
filter: F,
) -> Result<Subscription<T>, PropertyError>
where
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
let id = self.subscribe_filtered(observer, filter)?;
Ok(Subscription {
inner: Arc::clone(&self.inner),
id
})
}
pub fn subscribe_async_with_token<F, Fut>(&self, handler: F) -> Result<Subscription<T>, PropertyError>
where
F: Fn(T, T) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let id = self.subscribe_async(handler)?;
Ok(Subscription {
inner: Arc::clone(&self.inner),
id
})
}
pub fn subscribe_async_filtered_with_token<F, Fut, Filt>(
&self,
handler: F,
filter: Filt,
) -> Result<Subscription<T>, PropertyError>
where
F: Fn(T, T) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
Filt: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
let id = self.subscribe_async_filtered(handler, filter)?;
Ok(Subscription {
inner: Arc::clone(&self.inner),
id
})
}
}
impl<T: Clone + Send + Sync + 'static + Default> Default for ObservableProperty<T> {
fn default() -> Self {
Self::new(T::default())
}
}
impl<T: Clone> Clone for ObservableProperty<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
config: self.config.clone(),
async_task_semaphore: Arc::clone(&self.async_task_semaphore),
}
}
}
impl<T: Clone + Send + Sync + 'static> Clone for PropertyHandle<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.get() {
Ok(value) => f.debug_struct("ObservableProperty")
.field("value", &value)
.field("observers_count", &self.observer_count())
.finish(),
Err(_) => f.debug_struct("ObservableProperty")
.field("value", &"[inaccessible]")
.field("observers_count", &self.observer_count())
.finish(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_new_and_get() -> Result<(), PropertyError> {
let property = ObservableProperty::new(42);
assert_eq!(property.get()?, 42);
Ok(())
}
#[tokio::test]
async fn test_default() {
let property: ObservableProperty<String> = Default::default();
assert_eq!(property.get().unwrap(), String::default());
}
#[tokio::test]
async fn test_get_ref() {
let property = ObservableProperty::new("hello".to_string());
let value_ref = property.get_ref();
assert_eq!(*value_ref, "hello");
}
#[tokio::test]
async fn test_set() -> Result<(), PropertyError> {
let property = ObservableProperty::new(10);
property.set(20)?;
assert_eq!(property.get()?, 20);
Ok(())
}
#[tokio::test]
async fn test_update() -> Result<(), PropertyError> {
let property = ObservableProperty::new(10);
property.update(|val| val * 2)?;
assert_eq!(property.get()?, 20);
Ok(())
}
#[tokio::test]
async fn test_update_async() -> Result<(), PropertyError> {
let property = ObservableProperty::new(10);
property.update_async(|val| val * 2).await?;
assert_eq!(property.get()?, 20);
Ok(())
}
#[tokio::test]
async fn test_map() -> Result<(), PropertyError> {
let property = ObservableProperty::new(10);
let derived = property.map(|val| val.to_string())?;
assert_eq!(derived.get()?, "10");
property.set(20)?;
assert_eq!(derived.get()?, "20");
Ok(())
}
#[tokio::test]
async fn test_set_async() -> Result<(), PropertyError> {
let property = ObservableProperty::new("hello".to_string());
property.set_async("world".to_string()).await?;
assert_eq!(property.get()?, "world");
Ok(())
}
#[tokio::test]
async fn test_clone() -> Result<(), PropertyError> {
let property1 = ObservableProperty::new(100);
let property2 = property1.clone();
property2.set(200)?;
assert_eq!(property1.get()?, 200);
assert_eq!(property2.get()?, 200);
Ok(())
}
#[tokio::test]
async fn test_subscribe_and_notify() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
property.subscribe(Arc::new(move |_, _| {
counter_clone.fetch_add(1, Ordering::SeqCst);
}))?;
property.set(1)?;
property.set(2)?;
property.set(3)?;
assert_eq!(counter.load(Ordering::SeqCst), 3);
Ok(())
}
#[tokio::test]
async fn test_observer_count() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
assert_eq!(property.observer_count(), 0);
let id1 = property.subscribe(Arc::new(|_, _| {}))?;
let id2 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);
property.unsubscribe(id1)?;
assert_eq!(property.observer_count(), 1);
property.unsubscribe(id2)?;
assert_eq!(property.observer_count(), 0);
Ok(())
}
#[tokio::test]
async fn test_try_unsubscribe() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let id = property.subscribe(Arc::new(|_, _| {}))?;
assert!(property.try_unsubscribe(id));
assert!(!property.try_unsubscribe(id));
Ok(())
}
#[tokio::test]
async fn test_subscribe_async() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
property.subscribe_async(move |_, _| {
let counter = counter_clone.clone();
async move {
sleep(Duration::from_millis(10)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
property.set_async(1).await?;
property.set_async(2).await?;
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_subscribe_async_filtered() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
property.subscribe_async_filtered(
move |_, _| {
let counter = counter_clone.clone();
async move {
sleep(Duration::from_millis(10)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
},
|old, new| new > old
)?;
property.set_async(10).await?; property.set_async(5).await?; property.set_async(15).await?;
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_multiple_observers() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter1 = Arc::new(AtomicUsize::new(0));
let counter2 = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new({
let counter = counter1.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
property.subscribe(Arc::new({
let counter = counter2.clone();
move |_, _| { counter.fetch_add(2, Ordering::SeqCst); }
}))?;
property.set(42)?;
assert_eq!(counter1.load(Ordering::SeqCst), 1);
assert_eq!(counter2.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_unsubscribe() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let id = property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
property.set(1)?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
property.unsubscribe(id)?;
property.set(2)?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
match property.unsubscribe(id) {
Err(PropertyError::ObserverNotFound { .. }) => {},
other => panic!("Expected ObserverNotFound error, got {:?}", other),
}
Ok(())
}
#[tokio::test]
async fn test_filtered_observer() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe_filtered(
Arc::new({
let counter = counter.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}),
|old, new| new > old )?;
property.set(10)?; property.set(5)?; property.set(15)?;
assert_eq!(counter.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_concurrent_modifications() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let property = Arc::new(ObservableProperty::new(0));
let final_counter = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new({
let counter = final_counter.clone();
move |_, new| {
counter.store(*new, Ordering::SeqCst);
}
}))?;
let mut tasks = vec![];
for i in 1..=5 {
let prop = property.clone();
let task = tokio::spawn(async move {
prop.set(i).map_err(|e| format!("Failed to set property: {}", e))
});
tasks.push(task);
}
for task in tasks {
task.await??;
}
let final_value = final_counter.load(Ordering::SeqCst);
assert!(final_value >= 1 && final_value <= 5);
Ok(())
}
#[tokio::test]
async fn test_observer_panic_handling() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new(|_, _| {
panic!("This observer intentionally panics");
}))?;
property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
property.set(42)?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test]
async fn test_async_observers_with_async_set() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
let counter_clone = counter.clone();
property.subscribe_async(move |_, _| {
let counter = counter_clone.clone();
async move {
sleep(Duration::from_millis(10)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
property.set_async(42).await?;
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_many_observers() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..100 {
property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| {
counter.fetch_add(1, Ordering::SeqCst);
}
}))?;
}
property.set_async(999).await?;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter.load(Ordering::SeqCst), 100);
Ok(())
}
#[tokio::test]
async fn test_observer_receives_correct_values() -> Result<(), PropertyError> {
let property = ObservableProperty::new(100);
let vals = Arc::new((AtomicUsize::new(0), AtomicUsize::new(0)));
property.subscribe(Arc::new({
let vals = vals.clone();
move |old, new| {
vals.0.store(*old, Ordering::SeqCst);
vals.1.store(*new, Ordering::SeqCst);
}
}))?;
property.set(200)?;
assert_eq!(vals.0.load(Ordering::SeqCst), 100);
assert_eq!(vals.1.load(Ordering::SeqCst), 200);
Ok(())
}
#[derive(Debug, Clone, PartialEq)]
struct Person {
name: String,
age: u32,
}
#[tokio::test]
async fn test_complex_data_type() -> Result<(), PropertyError> {
let person1 = Person {
name: "Alice".to_string(),
age: 30,
};
let person2 = Person {
name: "Bob".to_string(),
age: 25,
};
let property = ObservableProperty::new(person1.clone());
assert_eq!(property.get()?, person1);
let name_changes = Arc::new(AtomicUsize::new(0));
property.subscribe_filtered(
Arc::new({
let counter = name_changes.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}),
|old, new| old.name != new.name )?;
let mut person3 = person1.clone();
person3.age = 31;
property.set(person3)?;
assert_eq!(name_changes.load(Ordering::SeqCst), 0);
property.set(person2)?;
assert_eq!(name_changes.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test]
async fn test_waiting_for_observers() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter_for_observer = counter.clone();
property.subscribe_async(move |_, _| {
let counter = counter_for_observer.clone();
async move {
sleep(Duration::from_millis(50)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
property.set_async(42).await?;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test]
async fn test_subscription_auto_cleanup() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
{
let _subscription = property.subscribe_with_token(Arc::new({
let counter = counter.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
property.set(1)?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
property.set(2)?;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
property.set(3)?;
assert_eq!(counter.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_filtered_subscription_auto_cleanup() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let filter = |old: &i32, new: &i32| new > old;
{
let _subscription = property.subscribe_filtered_with_token(
Arc::new({
let counter = counter.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}),
filter
)?;
property.set(10)?; assert_eq!(counter.load(Ordering::SeqCst), 1);
property.set(5)?; assert_eq!(counter.load(Ordering::SeqCst), 1);
property.set(15)?; assert_eq!(counter.load(Ordering::SeqCst), 2);
}
property.set(20)?;
assert_eq!(counter.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_async_subscription_auto_cleanup() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
{
let _subscription = property.subscribe_async_with_token(move |_, _| {
let counter = counter_clone.clone(); async move {
sleep(Duration::from_millis(10)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
property.set_async(1).await?;
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
property.set_async(2).await?;
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test]
async fn test_async_filtered_subscription_auto_cleanup() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
{
let _subscription = property.subscribe_async_filtered_with_token(
move |_, _| {
let counter = counter_clone.clone(); async move {
sleep(Duration::from_millis(10)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
},
|old, new| new > old )?;
property.set_async(10).await?;
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
property.set_async(5).await?; sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
property.set_async(15).await?;
sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test]
async fn test_multiple_subscriptions() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
let counter1 = Arc::new(AtomicUsize::new(0));
let counter2 = Arc::new(AtomicUsize::new(0));
let subscription1 = property.subscribe_with_token(Arc::new({
let counter = counter1.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
let subscription2 = property.subscribe_with_token(Arc::new({
let counter = counter2.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
property.set(1)?;
assert_eq!(counter1.load(Ordering::SeqCst), 1);
assert_eq!(counter2.load(Ordering::SeqCst), 1);
drop(subscription1);
property.set(2)?;
assert_eq!(counter1.load(Ordering::SeqCst), 1); assert_eq!(counter2.load(Ordering::SeqCst), 2);
drop(subscription2);
property.set(3)?;
assert_eq!(counter1.load(Ordering::SeqCst), 1);
assert_eq!(counter2.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn test_subscription_with_property_drop() -> Result<(), PropertyError> {
let counter = Arc::new(AtomicUsize::new(0));
let subscription;
{
let property = ObservableProperty::new(0);
subscription = property.subscribe_with_token(Arc::new({
let counter = counter.clone();
move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
}))?;
property.set(1)?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
drop(subscription);
Ok(())
}
#[tokio::test]
async fn test_cleanup_methods() -> Result<(), PropertyError> {
let property = ObservableProperty::new(42);
let counter = Arc::new(AtomicUsize::new(0));
let counter1 = counter.clone();
property.subscribe(Arc::new(move |_, _| {
counter1.fetch_add(1, Ordering::SeqCst);
}))?;
let counter2 = counter.clone();
property.subscribe_async(move |_, _| {
let counter = counter2.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
assert_eq!(property.observer_count(), 2);
property.clear_observers()?;
assert_eq!(property.observer_count(), 0);
property.set(100)?;
assert_eq!(counter.load(Ordering::SeqCst), 0);
let counter3 = counter.clone();
property.subscribe(Arc::new(move |_, _| {
counter3.fetch_add(1, Ordering::SeqCst);
}))?;
assert_eq!(property.observer_count(), 1);
property.shutdown()?;
assert_eq!(property.observer_count(), 0);
property.set(200)?;
assert_eq!(counter.load(Ordering::SeqCst), 0);
Ok(())
}
#[tokio::test]
async fn test_property_config_default() {
let config = PropertyConfig::default();
assert_eq!(config.max_observers, 1000);
assert_eq!(config.max_pending_notifications, 100);
assert_eq!(config.observer_timeout_ms, 5000);
}
#[tokio::test]
async fn test_property_with_custom_config() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 5,
max_pending_notifications: 10,
observer_timeout_ms: 1000,
max_concurrent_async_tasks: 100,
};
let property = ObservableProperty::new_with_config(42, config);
assert_eq!(property.get()?, 42);
for i in 0..5 {
property.subscribe(Arc::new(move |_, _| {
println!("Observer {}", i);
}))?;
}
assert_eq!(property.observer_count(), 5);
Ok(())
}
#[tokio::test]
async fn test_observer_capacity_limit() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 3,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 100,
};
let property = ObservableProperty::new_with_config(0, config);
property.subscribe(Arc::new(|_, _| {}))?;
property.subscribe(Arc::new(|_, _| {}))?;
property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 3);
let result = property.subscribe(Arc::new(|_, _| {}));
assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
if let Err(PropertyError::CapacityExceeded { current, max, resource }) = result {
assert_eq!(current, 3);
assert_eq!(max, 3);
assert_eq!(resource, "observers");
}
Ok(())
}
#[tokio::test]
async fn test_observer_capacity_after_unsubscribe() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 2,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 100,
};
let property = ObservableProperty::new_with_config(0, config);
let id1 = property.subscribe(Arc::new(|_, _| {}))?;
let _id2 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);
assert!(property.subscribe(Arc::new(|_, _| {})).is_err());
property.unsubscribe(id1)?;
assert_eq!(property.observer_count(), 1);
let _id3 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);
assert!(property.subscribe(Arc::new(|_, _| {})).is_err());
Ok(())
}
#[tokio::test]
async fn test_async_observer_capacity_limit() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 2,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 100,
};
let property = ObservableProperty::new_with_config(0, config);
property.subscribe_async(|_, _| async move {
sleep(Duration::from_millis(10)).await;
})?;
property.subscribe_async(|_, _| async move {
sleep(Duration::from_millis(10)).await;
})?;
assert_eq!(property.observer_count(), 2);
let result = property.subscribe_async(|_, _| async move {});
assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
Ok(())
}
#[tokio::test]
async fn test_filtered_observer_capacity_limit() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 2,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 100,
};
let property = ObservableProperty::new_with_config(0, config);
property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true)?;
property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true)?;
assert_eq!(property.observer_count(), 2);
let result = property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true);
assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
Ok(())
}
#[tokio::test]
async fn test_capacity_error_diagnostic() {
let error = PropertyError::CapacityExceeded {
current: 100,
max: 100,
resource: "observers".to_string(),
};
let diagnostic = error.diagnostic_info();
assert!(diagnostic.contains("CAPACITY_EXCEEDED"));
assert!(diagnostic.contains("resource=observers"));
assert!(diagnostic.contains("current=100"));
assert!(diagnostic.contains("max=100"));
assert!(diagnostic.contains("utilization=100.0%"));
}
#[tokio::test]
async fn test_cloned_property_shares_config() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 3,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 100,
};
let property1 = ObservableProperty::new_with_config(0, config);
let property2 = property1.clone();
property1.subscribe(Arc::new(|_, _| {}))?;
property2.subscribe(Arc::new(|_, _| {}))?;
property1.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property1.observer_count(), 3);
assert_eq!(property2.observer_count(), 3);
assert!(property1.subscribe(Arc::new(|_, _| {})).is_err());
assert!(property2.subscribe(Arc::new(|_, _| {})).is_err());
Ok(())
}
#[tokio::test]
async fn test_subscription_token_with_capacity() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 2,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 100,
};
let property = ObservableProperty::new_with_config(0, config);
let _sub1 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
let _sub2 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);
let result = property.subscribe_with_token(Arc::new(|_, _| {}));
assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
drop(_sub1);
let _sub3 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);
Ok(())
}
#[tokio::test]
async fn test_error_diagnostic_info() {
let read_error = PropertyError::read_lock_error("get_value", "acquiring read lock failed");
let diagnostic = read_error.diagnostic_info();
assert!(diagnostic.contains("READ_LOCK_ERROR"));
assert!(diagnostic.contains("operation=get_value"));
assert!(diagnostic.contains("context=acquiring read lock failed"));
assert!(diagnostic.contains("timestamp_ms="));
let write_error = PropertyError::write_lock_error("set_value", "acquiring write lock failed");
let diagnostic = write_error.diagnostic_info();
assert!(diagnostic.contains("WRITE_LOCK_ERROR"));
assert!(diagnostic.contains("operation=set_value"));
let poisoned_error = PropertyError::lock_poisoned("notify", "inner lock poisoned");
let diagnostic = poisoned_error.diagnostic_info();
assert!(diagnostic.contains("LOCK_POISONED"));
assert!(diagnostic.contains("operation=notify"));
assert!(diagnostic.contains("context=inner lock poisoned"));
let panic_error = PropertyError::observer_panic(ObserverId(42), "observer crashed");
let diagnostic = panic_error.diagnostic_info();
assert!(diagnostic.contains("OBSERVER_PANIC"));
assert!(diagnostic.contains("observer_id=42"));
assert!(diagnostic.contains("error=observer crashed"));
let not_found_error = PropertyError::ObserverNotFound { id: ObserverId(99) };
let diagnostic = not_found_error.diagnostic_info();
assert!(diagnostic.contains("OBSERVER_NOT_FOUND"));
assert!(diagnostic.contains("id=99"));
let capacity_error = PropertyError::CapacityExceeded {
current: 150,
max: 100,
resource: "observers".to_string(),
};
let diagnostic = capacity_error.diagnostic_info();
assert!(diagnostic.contains("CAPACITY_EXCEEDED"));
assert!(diagnostic.contains("resource=observers"));
assert!(diagnostic.contains("current=150"));
assert!(diagnostic.contains("max=100"));
assert!(diagnostic.contains("utilization=150.0%"));
let timeout_error = PropertyError::OperationTimeout {
operation: "notify_all".to_string(),
elapsed_ms: 5500,
threshold_ms: 5000,
};
let diagnostic = timeout_error.diagnostic_info();
assert!(diagnostic.contains("OPERATION_TIMEOUT"));
assert!(diagnostic.contains("operation=notify_all"));
assert!(diagnostic.contains("elapsed_ms=5500"));
assert!(diagnostic.contains("threshold_ms=5000"));
assert!(diagnostic.contains("overage_ms=500"));
let shutdown_error = PropertyError::ShutdownInProgress;
let diagnostic = shutdown_error.diagnostic_info();
assert!(diagnostic.contains("SHUTDOWN_IN_PROGRESS"));
let observer_error = PropertyError::ObserverError {
reason: "callback failed".to_string(),
};
let diagnostic = observer_error.diagnostic_info();
assert!(diagnostic.contains("OBSERVER_ERROR"));
assert!(diagnostic.contains("reason=callback failed"));
let tokio_error = PropertyError::TokioError {
reason: "runtime unavailable".to_string(),
};
let diagnostic = tokio_error.diagnostic_info();
assert!(diagnostic.contains("TOKIO_ERROR"));
assert!(diagnostic.contains("reason=runtime unavailable"));
let join_error = PropertyError::JoinError("task panicked".to_string());
let diagnostic = join_error.diagnostic_info();
assert!(diagnostic.contains("JOIN_ERROR"));
assert!(diagnostic.contains("message=task panicked"));
}
#[tokio::test]
async fn test_error_helper_functions_with_timestamp() {
use std::time::{SystemTime, UNIX_EPOCH};
let before = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let read_error = PropertyError::read_lock_error("test_op", "test_context");
let write_error = PropertyError::write_lock_error("test_op", "test_context");
let poisoned_error = PropertyError::lock_poisoned("test_op", "test_context");
let panic_error = PropertyError::observer_panic(ObserverId(1), "test_panic");
let after = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
match read_error {
PropertyError::ReadLockError { timestamp_ms, .. } => {
assert!(timestamp_ms >= before && timestamp_ms <= after);
}
_ => panic!("Expected ReadLockError"),
}
match write_error {
PropertyError::WriteLockError { timestamp_ms, .. } => {
assert!(timestamp_ms >= before && timestamp_ms <= after);
}
_ => panic!("Expected WriteLockError"),
}
match poisoned_error {
PropertyError::LockPoisoned { timestamp_ms, .. } => {
assert!(timestamp_ms >= before && timestamp_ms <= after);
}
_ => panic!("Expected LockPoisoned"),
}
match panic_error {
PropertyError::ObserverPanic { timestamp_ms, .. } => {
assert!(timestamp_ms >= before && timestamp_ms <= after);
}
_ => panic!("Expected ObserverPanic"),
}
}
#[tokio::test]
async fn test_error_display_formatting() {
let timeout_error = PropertyError::OperationTimeout {
operation: "test_operation".to_string(),
elapsed_ms: 1500,
threshold_ms: 1000,
};
let display = format!("{}", timeout_error);
assert!(display.contains("test_operation"));
assert!(display.contains("1500ms"));
assert!(display.contains("1000ms"));
let capacity_error = PropertyError::CapacityExceeded {
current: 200,
max: 100,
resource: "test_resource".to_string(),
};
let display = format!("{}", capacity_error);
assert!(display.contains("200"));
assert!(display.contains("100"));
assert!(display.contains("test_resource"));
}
#[tokio::test]
async fn test_capacity_exceeded_utilization_calculation() {
let error = PropertyError::CapacityExceeded {
current: 75,
max: 100,
resource: "observers".to_string(),
};
let diagnostic = error.diagnostic_info();
assert!(diagnostic.contains("utilization=75.0%"));
let error2 = PropertyError::CapacityExceeded {
current: 100,
max: 100,
resource: "observers".to_string(),
};
let diagnostic2 = error2.diagnostic_info();
assert!(diagnostic2.contains("utilization=100.0%"));
}
#[tokio::test]
async fn test_shutdown_with_timeout_basic() -> Result<(), PropertyError> {
use std::time::Duration;
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter1 = counter.clone();
property.subscribe(Arc::new(move |_, _| {
counter1.fetch_add(1, Ordering::SeqCst);
}))?;
let counter2 = counter.clone();
property.subscribe_async(move |_, _| {
let counter = counter2.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
assert_eq!(property.observer_count(), 2);
let report = property.shutdown_with_timeout(Duration::from_secs(5)).await?;
assert_eq!(report.observers_cleared, 2);
assert!(report.shutdown_duration.as_secs() < 5);
assert!(report.completed_within_timeout);
assert!(report.initiated_at_ms > 0);
assert_eq!(property.observer_count(), 0);
property.set(42)?;
assert_eq!(counter.load(Ordering::SeqCst), 0);
Ok(())
}
#[tokio::test]
async fn test_shutdown_report_diagnostic() -> Result<(), PropertyError> {
use std::time::Duration;
let property = ObservableProperty::new(100);
for _ in 0..5 {
property.subscribe(Arc::new(|_, _| {}))?;
}
let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
let diagnostic = report.diagnostic_info();
assert!(diagnostic.contains("SHUTDOWN_COMPLETE"));
assert!(diagnostic.contains("observers_cleared=5"));
assert!(diagnostic.contains("within_timeout=true"));
assert!(diagnostic.contains("initiated_at_ms="));
Ok(())
}
#[tokio::test]
async fn test_shutdown_with_async_observers() -> Result<(), PropertyError> {
use std::time::Duration;
let property = ObservableProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let counter1 = counter.clone();
property.subscribe_async(move |_, _| {
let counter = counter1.clone();
async move {
tokio::time::sleep(Duration::from_millis(50)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
let counter2 = counter.clone();
property.subscribe_async(move |_, _| {
let counter = counter2.clone();
async move {
tokio::time::sleep(Duration::from_millis(50)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
property.set_async(42).await?;
tokio::time::sleep(Duration::from_millis(10)).await;
let report = property.shutdown_with_timeout(Duration::from_secs(2)).await?;
assert_eq!(report.observers_cleared, 2);
assert!(report.completed_within_timeout);
Ok(())
}
#[tokio::test]
async fn test_shutdown_idempotent() -> Result<(), PropertyError> {
use std::time::Duration;
let property = ObservableProperty::new("test");
property.subscribe(Arc::new(|_, _| {}))?;
property.subscribe(Arc::new(|_, _| {}))?;
let report1 = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
assert_eq!(report1.observers_cleared, 2);
let report2 = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
assert_eq!(report2.observers_cleared, 0);
Ok(())
}
#[tokio::test]
async fn test_shutdown_vs_shutdown_with_timeout() -> Result<(), PropertyError> {
use std::time::Duration;
let property1 = ObservableProperty::new(0);
property1.subscribe(Arc::new(|_, _| {}))?;
property1.subscribe(Arc::new(|_, _| {}))?;
property1.shutdown()?;
assert_eq!(property1.observer_count(), 0);
let property2 = ObservableProperty::new(0);
property2.subscribe(Arc::new(|_, _| {}))?;
property2.subscribe(Arc::new(|_, _| {}))?;
let report = property2.shutdown_with_timeout(Duration::from_secs(1)).await?;
assert_eq!(property2.observer_count(), 0);
assert_eq!(report.observers_cleared, 2);
Ok(())
}
#[tokio::test]
async fn test_shutdown_report_timing() -> Result<(), PropertyError> {
use std::time::{Duration, Instant};
let property = ObservableProperty::new(0);
for _ in 0..10 {
property.subscribe(Arc::new(|_, _| {}))?;
}
let start = Instant::now();
let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_secs(2));
assert!(report.shutdown_duration <= elapsed);
assert_eq!(report.observers_cleared, 10);
Ok(())
}
#[tokio::test]
async fn test_shutdown_with_filtered_and_async_observers() -> Result<(), PropertyError> {
use std::time::Duration;
let property = ObservableProperty::new(0);
property.subscribe(Arc::new(|_, _| {}))?;
property.subscribe_async(|_, _| async {})?;
property.subscribe_filtered(Arc::new(|_, _| {}), |_, new| new % 2 == 0)?;
property.subscribe_async_filtered(|_, _| async {}, |_, new| new > &0)?;
assert_eq!(property.observer_count(), 4);
let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
assert_eq!(report.observers_cleared, 4);
assert_eq!(property.observer_count(), 0);
Ok(())
}
#[tokio::test]
async fn test_batched_property_creation() -> Result<(), PropertyError> {
let property = BatchedProperty::new(42);
assert_eq!(property.get()?, 42);
let config = BatchConfig {
batch_interval: std::time::Duration::from_millis(50),
};
let property2 = BatchedProperty::new_with_config(100, config);
assert_eq!(property2.get()?, 100);
Ok(())
}
#[tokio::test]
async fn test_batched_property_queue_update() -> Result<(), PropertyError> {
use std::time::Duration;
let property = BatchedProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
let last_value = Arc::new(RwLock::new(0));
property.subscribe(Arc::new({
let counter = counter.clone();
let last_value = last_value.clone();
move |_, new| {
counter.fetch_add(1, Ordering::SeqCst);
*last_value.write() = *new;
}
}))?;
for i in 1..=10 {
property.queue_update(i)?;
}
tokio::time::sleep(Duration::from_millis(150)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(*last_value.read(), 10);
assert_eq!(property.get()?, 10);
Ok(())
}
#[tokio::test]
async fn test_batched_property_multiple_batches() -> Result<(), PropertyError> {
use std::time::Duration;
let config = BatchConfig {
batch_interval: Duration::from_millis(50),
};
let property = BatchedProperty::new_with_config(0, config);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| {
counter.fetch_add(1, Ordering::SeqCst);
}
}))?;
for i in 1..=5 {
property.queue_update(i)?;
}
tokio::time::sleep(Duration::from_millis(75)).await;
for i in 6..=10 {
property.queue_update(i)?;
}
tokio::time::sleep(Duration::from_millis(75)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(property.get()?, 10);
Ok(())
}
#[tokio::test]
async fn test_batched_property_set_immediate() -> Result<(), PropertyError> {
use std::time::Duration;
let property = BatchedProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| {
counter.fetch_add(1, Ordering::SeqCst);
}
}))?;
property.queue_update(5)?;
property.queue_update(10)?;
property.set_immediate(42)?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(property.get()?, 42);
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(counter.load(Ordering::SeqCst) >= 1);
Ok(())
}
#[tokio::test]
async fn test_batched_property_flush() -> Result<(), PropertyError> {
let property = BatchedProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| {
counter.fetch_add(1, Ordering::SeqCst);
}
}))?;
property.queue_update(42)?;
property.flush().await?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(property.get()?, 42);
Ok(())
}
#[tokio::test]
async fn test_batched_property_async_observer() -> Result<(), PropertyError> {
use std::time::Duration;
let property = BatchedProperty::new(0);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe_async({
let counter = counter.clone();
move |_, _| {
let counter = counter.clone();
async move {
tokio::time::sleep(Duration::from_millis(10)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
}
})?;
for i in 1..=5 {
property.queue_update(i)?;
}
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test]
async fn test_batched_property_observer_management() -> Result<(), PropertyError> {
let property = BatchedProperty::new(0);
let id1 = property.subscribe(Arc::new(|_, _| {}))?;
let _id2 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);
property.unsubscribe(id1)?;
assert_eq!(property.observer_count(), 1);
property.clear_observers()?;
assert_eq!(property.observer_count(), 0);
Ok(())
}
#[tokio::test]
async fn test_batched_property_clone() -> Result<(), PropertyError> {
use std::time::Duration;
let property1 = BatchedProperty::new(0);
let property2 = property1.clone();
let counter = Arc::new(AtomicUsize::new(0));
property2.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| {
counter.fetch_add(1, Ordering::SeqCst);
}
}))?;
property1.queue_update(42)?;
tokio::time::sleep(Duration::from_millis(150)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(property1.get()?, 42);
assert_eq!(property2.get()?, 42);
Ok(())
}
#[tokio::test]
async fn test_batched_property_high_frequency() -> Result<(), PropertyError> {
use std::time::Duration;
let config = BatchConfig {
batch_interval: Duration::from_millis(100),
};
let property = BatchedProperty::new_with_config(0, config);
let counter = Arc::new(AtomicUsize::new(0));
property.subscribe(Arc::new({
let counter = counter.clone();
move |_, _| {
counter.fetch_add(1, Ordering::SeqCst);
}
}))?;
for i in 1..=1000 {
property.queue_update(i)?;
}
tokio::time::sleep(Duration::from_millis(150)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(property.get()?, 1000);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct BatchConfig {
pub batch_interval: std::time::Duration,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
batch_interval: std::time::Duration::from_millis(100),
}
}
}
pub struct BatchedProperty<T: Clone + Send + Sync + 'static> {
inner: ObservableProperty<T>,
pending_update: Arc<RwLock<Option<T>>>,
_batch_task: Arc<tokio::task::JoinHandle<()>>,
}
impl<T: Clone + Send + Sync + 'static> BatchedProperty<T> {
pub fn new(initial_value: T) -> Self {
Self::new_with_config(initial_value, BatchConfig::default())
}
pub fn new_with_config(initial_value: T, config: BatchConfig) -> Self {
let inner = ObservableProperty::new(initial_value);
let pending_update = Arc::new(RwLock::new(None));
let inner_clone = inner.clone();
let pending_clone = pending_update.clone();
let batch_interval = config.batch_interval;
let batch_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(batch_interval);
loop {
interval.tick().await;
let update = {
let mut pending = pending_clone.write();
pending.take()
};
if let Some(value) = update {
let _ = inner_clone.set_async(value).await;
}
}
});
Self {
inner,
pending_update,
_batch_task: Arc::new(batch_task),
}
}
pub fn queue_update(&self, value: T) -> Result<(), PropertyError> {
*self.pending_update.write() = Some(value);
Ok(())
}
pub fn set_immediate(&self, value: T) -> Result<(), PropertyError> {
self.inner.set(value)
}
pub async fn set_immediate_async(&self, value: T) -> Result<(), PropertyError> {
self.inner.set_async(value).await
}
pub fn get(&self) -> Result<T, PropertyError> {
self.inner.get()
}
pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
self.inner.subscribe(observer)
}
pub fn subscribe_async<F, Fut>(&self, handler: F) -> Result<ObserverId, PropertyError>
where
F: Fn(T, T) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
self.inner.subscribe_async(handler)
}
pub fn unsubscribe(&self, id: ObserverId) -> Result<(), PropertyError> {
self.inner.unsubscribe(id)
}
pub fn observer_count(&self) -> usize {
self.inner.observer_count()
}
pub fn clear_observers(&self) -> Result<(), PropertyError> {
self.inner.clear_observers()
}
pub async fn flush(&self) -> Result<(), PropertyError> {
let update = {
let mut pending = self.pending_update.write();
pending.take()
};
if let Some(value) = update {
self.inner.set_async(value).await?;
}
Ok(())
}
}
impl<T: Clone + Send + Sync + 'static> Clone for BatchedProperty<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
pending_update: Arc::clone(&self.pending_update),
_batch_task: Arc::clone(&self._batch_task),
}
}
}
impl From<JoinError> for PropertyError {
fn from(err: JoinError) -> Self {
PropertyError::JoinError(err.to_string())
}
}
#[cfg(test)]
mod connection_pool_tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_concurrent_task_limiting() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 1000,
max_pending_notifications: 1000,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 5, };
let property = ObservableProperty::new_with_config(0, config);
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
for _ in 0..20 {
let counter = Arc::clone(&concurrent_count);
let max_counter = Arc::clone(&max_concurrent);
property.subscribe_async(move |_, _| {
let counter = Arc::clone(&counter);
let max_counter = Arc::clone(&max_counter);
async move {
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
max_counter.fetch_max(current, Ordering::SeqCst);
sleep(Duration::from_millis(100)).await;
counter.fetch_sub(1, Ordering::SeqCst);
}
})?;
}
property.set_async(42).await?;
sleep(Duration::from_millis(500)).await;
let max_reached = max_concurrent.load(Ordering::SeqCst);
println!(
"Max concurrent tasks: {} (limit: 5)",
max_reached
);
assert!(
max_reached <= 5,
"Expected max concurrent tasks <= 5, but got {}",
max_reached
);
Ok(())
}
#[tokio::test]
async fn test_semaphore_blocks_when_max_reached() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 100,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 2, };
let property = ObservableProperty::new_with_config(0, config);
let execution_order = Arc::new(parking_lot::RwLock::new(Vec::new()));
for i in 0..5 {
let order = Arc::clone(&execution_order);
property.subscribe_async(move |_, _| {
let order = Arc::clone(&order);
async move {
order.write().push((i, "start"));
sleep(Duration::from_millis(50)).await;
order.write().push((i, "end"));
}
})?;
}
property.set_async(100).await?;
sleep(Duration::from_millis(300)).await;
let order = execution_order.read();
println!("Execution order: {:?}", *order);
assert_eq!(order.len(), 10, "Should have 5 start and 5 end events");
let starts = order.iter().filter(|(_, phase)| *phase == "start").count();
let ends = order.iter().filter(|(_, phase)| *phase == "end").count();
assert_eq!(starts, 5, "Should have 5 starts");
assert_eq!(ends, 5, "Should have 5 ends");
Ok(())
}
#[tokio::test]
async fn test_permits_released_after_execution() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 100,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 3,
};
let property = ObservableProperty::new_with_config(0, config);
let executions = Arc::new(AtomicUsize::new(0));
for _ in 0..10 {
let counter = Arc::clone(&executions);
property.subscribe_async(move |_, _| {
let counter = Arc::clone(&counter);
async move {
counter.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(10)).await;
}
})?;
}
for _ in 0..3 {
property.set_async(42).await?;
sleep(Duration::from_millis(100)).await;
}
let total = executions.load(Ordering::SeqCst);
assert_eq!(total, 30, "Expected 30 executions, got {}", total);
Ok(())
}
#[tokio::test]
async fn test_filtered_async_observers_respect_limit() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 100,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 3,
};
let property = ObservableProperty::new_with_config(0, config);
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
for _ in 0..10 {
let counter = Arc::clone(&concurrent_count);
let max_counter = Arc::clone(&max_concurrent);
property.subscribe_async_filtered(
move |_, _| {
let counter = Arc::clone(&counter);
let max_counter = Arc::clone(&max_counter);
async move {
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
max_counter.fetch_max(current, Ordering::SeqCst);
sleep(Duration::from_millis(50)).await;
counter.fetch_sub(1, Ordering::SeqCst);
}
},
|_, &new| new % 2 == 0, )?;
}
property.set_async(100).await?;
sleep(Duration::from_millis(200)).await;
let max_reached = max_concurrent.load(Ordering::SeqCst);
println!("Max concurrent filtered async tasks: {}", max_reached);
assert!(
max_reached <= 3,
"Expected max concurrent tasks <= 3, got {}",
max_reached
);
Ok(())
}
#[tokio::test]
async fn test_default_concurrent_limit() -> Result<(), PropertyError> {
let property = ObservableProperty::new(0);
for _ in 0..200 {
property.subscribe_async(|_, _| async move {
sleep(Duration::from_millis(50)).await;
})?;
}
property.set_async(42).await?;
sleep(Duration::from_millis(300)).await;
Ok(())
}
#[tokio::test]
async fn test_mixed_sync_and_async_observers() -> Result<(), PropertyError> {
let config = PropertyConfig {
max_observers: 100,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 2,
};
let property = ObservableProperty::new_with_config(0, config);
let sync_count = Arc::new(AtomicUsize::new(0));
let async_count = Arc::new(AtomicUsize::new(0));
for _ in 0..5 {
let counter = Arc::clone(&sync_count);
property.subscribe(Arc::new(move |_, _| {
counter.fetch_add(1, Ordering::SeqCst);
}))?;
}
for _ in 0..5 {
let counter = Arc::clone(&async_count);
property.subscribe_async(move |_, _| {
let counter = Arc::clone(&counter);
async move {
sleep(Duration::from_millis(20)).await;
counter.fetch_add(1, Ordering::SeqCst);
}
})?;
}
property.set_async(100).await?;
sleep(Duration::from_millis(150)).await;
assert_eq!(sync_count.load(Ordering::SeqCst), 5);
assert_eq!(async_count.load(Ordering::SeqCst), 5);
Ok(())
}
}