observable_property_tokio/lib.rs
1//! # Observable Property with Tokio
2//!
3//! A thread-safe, async-compatible observable property implementation for Rust that allows you to
4//! observe changes to values using Tokio for asynchronous operations.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access with optimized locking
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with Tokio tasks
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property_tokio::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), observable_property_tokio::PropertyError> {
23//! // Create an observable property
24//! let property = ObservableProperty::new(42);
25//!
26//! // Subscribe to changes
27//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
28//! println!("Value changed from {} to {}", old_value, new_value);
29//! }))?;
30//!
31//! // Change the value (triggers observer)
32//! property.set(100)?;
33//!
34//! // For async notification (uses Tokio)
35//! property.set_async(200).await?;
36//!
37//! // Unsubscribe when done
38//! property.unsubscribe(observer_id)?;
39//!
40//! Ok(())
41//! }
42//! ```
43//!
44//! ## Multi-threading Example with Tokio
45//!
46//! ```rust
47//! use observable_property_tokio::ObservableProperty;
48//! use std::sync::Arc;
49//! use tokio::task;
50//!
51//! #[tokio::main]
52//! async fn main() -> Result<(), observable_property_tokio::PropertyError> {
53//! let property = Arc::new(ObservableProperty::new(0));
54//! let property_clone = property.clone();
55//!
56//! // Subscribe from one task
57//! property.subscribe(Arc::new(|old, new| {
58//! println!("Value changed: {} -> {}", old, new);
59//! }))?;
60//!
61//! // Modify from another task
62//! task::spawn(async move {
63//! property_clone.set(42)?;
64//! Ok::<_, observable_property_tokio::PropertyError>(())
65//! }).await??;
66//!
67//! Ok(())
68//! }
69//! ```
70
71use std::collections::HashMap;
72use std::fmt;
73use std::panic;
74use std::sync::Arc;
75use std::time::{SystemTime, UNIX_EPOCH};
76use parking_lot::RwLock;
77use thiserror::Error;
78use tokio::task::{self, JoinError};
79
80/// Errors that can occur when working with ObservableProperty
81#[derive(Error, Debug, Clone)]
82pub enum PropertyError {
83 /// Failed to acquire a read lock on the property
84 #[error("Failed to acquire read lock during '{operation}': {context}")]
85 ReadLockError {
86 /// The operation being attempted
87 operation: String,
88 /// Context describing what operation was being attempted
89 context: String,
90 /// Timestamp when error occurred (milliseconds since epoch)
91 timestamp_ms: u64,
92 },
93
94 /// Failed to acquire a write lock on the property
95 #[error("Failed to acquire write lock during '{operation}': {context}")]
96 WriteLockError {
97 /// The operation being attempted
98 operation: String,
99 /// Context describing what operation was being attempted
100 context: String,
101 /// Timestamp when error occurred (milliseconds since epoch)
102 timestamp_ms: u64,
103 },
104
105 /// Attempted to unsubscribe an observer that doesn't exist
106 #[error("Observer with ID {id} not found")]
107 ObserverNotFound {
108 /// The ID of the observer that wasn't found
109 id: ObserverId,
110 },
111
112 /// The property's lock has been poisoned due to a panic in another thread
113 #[error("Lock poisoned during '{operation}': {context}")]
114 LockPoisoned {
115 /// The operation that encountered the poisoned lock
116 operation: String,
117 /// Additional context about the poisoned lock
118 context: String,
119 /// Timestamp when error occurred (milliseconds since epoch)
120 timestamp_ms: u64,
121 },
122
123 /// An observer function panicked during execution
124 #[error("Observer {observer_id} panicked: {error}")]
125 ObserverPanic {
126 /// The ID of the observer that panicked
127 observer_id: ObserverId,
128 /// The panic error message
129 error: String,
130 /// Timestamp when error occurred (milliseconds since epoch)
131 timestamp_ms: u64,
132 },
133
134 /// An observer function encountered an error during execution
135 #[error("Observer execution failed: {reason}")]
136 ObserverError {
137 /// Description of what went wrong
138 reason: String,
139 },
140
141 /// A Tokio-related error occurred
142 #[error("Tokio runtime error: {reason}")]
143 TokioError {
144 /// Description of what went wrong
145 reason: String,
146 },
147
148 /// A task join error occurred
149 #[error("Task join error: {0}")]
150 JoinError(String),
151
152 /// Maximum capacity has been exceeded
153 #[error("Capacity exceeded: current={current}, max={max}, resource={resource}")]
154 CapacityExceeded {
155 /// Current count
156 current: usize,
157 /// Maximum allowed
158 max: usize,
159 /// The resource that exceeded capacity
160 resource: String,
161 },
162
163 /// Operation exceeded timeout threshold
164 #[error("Operation '{operation}' timed out: {elapsed_ms}ms > {threshold_ms}ms")]
165 OperationTimeout {
166 /// The operation that timed out
167 operation: String,
168 /// Actual elapsed time in milliseconds
169 elapsed_ms: u64,
170 /// Timeout threshold in milliseconds
171 threshold_ms: u64,
172 },
173
174 /// The property is shutting down and not accepting new operations
175 #[error("Property is shutting down")]
176 ShutdownInProgress,
177}
178
179impl PropertyError {
180 /// Get a diagnostic string suitable for logging and monitoring
181 ///
182 /// This method returns a structured string containing all relevant
183 /// diagnostic information about the error, including timestamps,
184 /// operation context, and performance metrics where applicable.
185 ///
186 /// # Returns
187 ///
188 /// A formatted string containing diagnostic information
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use observable_property_tokio::PropertyError;
194 ///
195 /// let error = PropertyError::OperationTimeout {
196 /// operation: "notify_observers".to_string(),
197 /// elapsed_ms: 5500,
198 /// threshold_ms: 5000,
199 /// };
200 ///
201 /// let diagnostic = error.diagnostic_info();
202 /// assert!(diagnostic.contains("notify_observers"));
203 /// assert!(diagnostic.contains("elapsed_ms=5500"));
204 /// ```
205 pub fn diagnostic_info(&self) -> String {
206 match self {
207 Self::ReadLockError { operation, context, timestamp_ms } => {
208 format!(
209 "READ_LOCK_ERROR | operation={} | context={} | timestamp_ms={}",
210 operation, context, timestamp_ms
211 )
212 }
213 Self::WriteLockError { operation, context, timestamp_ms } => {
214 format!(
215 "WRITE_LOCK_ERROR | operation={} | context={} | timestamp_ms={}",
216 operation, context, timestamp_ms
217 )
218 }
219 Self::LockPoisoned { operation, context, timestamp_ms } => {
220 format!(
221 "LOCK_POISONED | operation={} | context={} | timestamp_ms={}",
222 operation, context, timestamp_ms
223 )
224 }
225 Self::ObserverPanic { observer_id, error, timestamp_ms } => {
226 format!(
227 "OBSERVER_PANIC | observer_id={} | error={} | timestamp_ms={}",
228 observer_id, error, timestamp_ms
229 )
230 }
231 Self::ObserverNotFound { id } => {
232 format!("OBSERVER_NOT_FOUND | id={}", id)
233 }
234 Self::CapacityExceeded { current, max, resource } => {
235 format!(
236 "CAPACITY_EXCEEDED | resource={} | current={} | max={} | utilization={:.1}%",
237 resource, current, max, (*current as f64 / *max as f64) * 100.0
238 )
239 }
240 Self::OperationTimeout { operation, elapsed_ms, threshold_ms } => {
241 format!(
242 "OPERATION_TIMEOUT | operation={} | elapsed_ms={} | threshold_ms={} | overage_ms={}",
243 operation, elapsed_ms, threshold_ms, elapsed_ms.saturating_sub(*threshold_ms)
244 )
245 }
246 Self::ShutdownInProgress => {
247 "SHUTDOWN_IN_PROGRESS | property is shutting down".to_string()
248 }
249 Self::ObserverError { reason } => {
250 format!("OBSERVER_ERROR | reason={}", reason)
251 }
252 Self::TokioError { reason } => {
253 format!("TOKIO_ERROR | reason={}", reason)
254 }
255 Self::JoinError(msg) => {
256 format!("JOIN_ERROR | message={}", msg)
257 }
258 }
259 }
260
261 /// Get the current timestamp in milliseconds since UNIX epoch
262 ///
263 /// This is a helper function used internally for error creation
264 fn current_timestamp_ms() -> u64 {
265 SystemTime::now()
266 .duration_since(UNIX_EPOCH)
267 .map(|d| d.as_millis() as u64)
268 .unwrap_or(0)
269 }
270
271 /// Create a ReadLockError with current timestamp
272 pub fn read_lock_error(operation: impl Into<String>, context: impl Into<String>) -> Self {
273 Self::ReadLockError {
274 operation: operation.into(),
275 context: context.into(),
276 timestamp_ms: Self::current_timestamp_ms(),
277 }
278 }
279
280 /// Create a WriteLockError with current timestamp
281 pub fn write_lock_error(operation: impl Into<String>, context: impl Into<String>) -> Self {
282 Self::WriteLockError {
283 operation: operation.into(),
284 context: context.into(),
285 timestamp_ms: Self::current_timestamp_ms(),
286 }
287 }
288
289 /// Create a LockPoisoned error with current timestamp
290 pub fn lock_poisoned(operation: impl Into<String>, context: impl Into<String>) -> Self {
291 Self::LockPoisoned {
292 operation: operation.into(),
293 context: context.into(),
294 timestamp_ms: Self::current_timestamp_ms(),
295 }
296 }
297
298 /// Create an ObserverPanic error with current timestamp
299 pub fn observer_panic(observer_id: ObserverId, error: impl Into<String>) -> Self {
300 Self::ObserverPanic {
301 observer_id,
302 error: error.into(),
303 timestamp_ms: Self::current_timestamp_ms(),
304 }
305 }
306}
307
308/// Configuration options for ObservableProperty
309///
310/// This struct allows you to configure limits and behavior for an observable property,
311/// helping prevent resource exhaustion and enabling production-grade backpressure handling.
312///
313/// # Examples
314///
315/// ```
316/// use observable_property_tokio::{ObservableProperty, PropertyConfig};
317///
318/// let config = PropertyConfig {
319/// max_observers: 100,
320/// max_pending_notifications: 50,
321/// observer_timeout_ms: 5000,
322/// max_concurrent_async_tasks: 50,
323/// };
324///
325/// let property = ObservableProperty::new_with_config(42, config);
326/// ```
327#[derive(Debug, Clone)]
328pub struct PropertyConfig {
329 /// Maximum number of observers allowed
330 ///
331 /// When this limit is reached, attempts to subscribe additional observers
332 /// will return a `PropertyError::CapacityExceeded` error.
333 ///
334 /// Default: 1000
335 pub max_observers: usize,
336
337 /// Maximum pending async notifications per observer (reserved for future use)
338 ///
339 /// This limit helps prevent memory exhaustion from queued notifications.
340 ///
341 /// Default: 100
342 pub max_pending_notifications: usize,
343
344 /// Timeout for observer execution in milliseconds (reserved for future use)
345 ///
346 /// Observers that exceed this threshold may be logged or flagged for debugging.
347 ///
348 /// Default: 5000ms
349 pub observer_timeout_ms: u64,
350
351 /// Maximum number of concurrent async observer tasks
352 ///
353 /// This limit prevents resource exhaustion by limiting the number of
354 /// async observer notifications that can execute simultaneously.
355 /// When the limit is reached, new async notifications will wait until
356 /// a slot becomes available (using a semaphore for coordination).
357 ///
358 /// Default: 100
359 pub max_concurrent_async_tasks: usize,
360}
361
362impl Default for PropertyConfig {
363 fn default() -> Self {
364 Self {
365 max_observers: 1000,
366 max_pending_notifications: 100,
367 observer_timeout_ms: 5000,
368 max_concurrent_async_tasks: 100,
369 }
370 }
371}
372
373/// Report generated after a property shutdown operation
374///
375/// Contains diagnostic information about the shutdown process,
376/// including the number of observers cleared and timing information.
377///
378/// # Examples
379///
380/// ```
381/// use observable_property_tokio::{ObservableProperty, PropertyConfig};
382/// use std::time::Duration;
383///
384/// #[tokio::main]
385/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
386/// let property = ObservableProperty::new(42);
387///
388/// // ... use property ...
389///
390/// let report = property.shutdown_with_timeout(Duration::from_secs(5)).await?;
391/// println!("Shutdown complete: {:?}", report);
392/// Ok(())
393/// }
394/// ```
395#[derive(Debug, Clone)]
396pub struct ShutdownReport {
397 /// Number of observers that were cleared during shutdown
398 pub observers_cleared: usize,
399
400 /// Time taken to complete the shutdown operation
401 pub shutdown_duration: std::time::Duration,
402
403 /// Whether the shutdown completed within the timeout period
404 pub completed_within_timeout: bool,
405
406 /// Timestamp when shutdown was initiated (milliseconds since epoch)
407 pub initiated_at_ms: u64,
408}
409
410impl ShutdownReport {
411 /// Get a diagnostic string for logging
412 pub fn diagnostic_info(&self) -> String {
413 format!(
414 "SHUTDOWN_COMPLETE | observers_cleared={} | duration_ms={} | within_timeout={} | initiated_at_ms={}",
415 self.observers_cleared,
416 self.shutdown_duration.as_millis(),
417 self.completed_within_timeout,
418 self.initiated_at_ms
419 )
420 }
421}
422
423/// Function type for observers that get called when property values change
424pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
425
426/// Unique identifier for registered observers
427#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
428pub struct ObserverId(pub(crate) usize);
429
430impl From<ObserverId> for usize {
431 /// Convert an ObserverId to a usize
432 ///
433 /// This allows backward compatibility with code that expects to use
434 /// the ID as a regular number.
435 ///
436 /// # Examples
437 ///
438 /// ```
439 /// use observable_property_tokio::ObserverId;
440 ///
441 /// let id = ObserverId::from(42); // For illustration - actual IDs come from subscribe()
442 /// let value: usize = id.into();
443 /// assert_eq!(value, 42);
444 /// ```
445 fn from(id: ObserverId) -> Self {
446 id.0
447 }
448}
449
450impl From<usize> for ObserverId {
451 /// Create an ObserverId from a usize
452 ///
453 /// This is primarily for backward compatibility and testing.
454 /// In normal usage, IDs are created by the library through subscribe() calls.
455 ///
456 /// # Examples
457 ///
458 /// ```
459 /// use observable_property_tokio::ObserverId;
460 ///
461 /// let id = ObserverId::from(42);
462 /// ```
463 fn from(value: usize) -> Self {
464 ObserverId(value)
465 }
466}
467
468impl fmt::Display for ObserverId {
469 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470 write!(f, "{}", self.0)
471 }
472}
473
474/// A thread-safe observable property that notifies observers when its value changes
475///
476/// This type wraps a value of type `T` and allows multiple observers to be notified
477/// whenever the value is modified. All operations are thread-safe and can be called
478/// from multiple threads concurrently. Asynchronous operations are powered by Tokio.
479///
480/// # Type Requirements
481///
482/// The generic type `T` must implement:
483/// - `Clone`: Required for returning values and passing them to observers
484/// - `Send`: Required for transferring between threads
485/// - `Sync`: Required for concurrent access from multiple threads
486/// - `'static`: Required for observer callbacks that may outlive the original scope
487///
488/// # Examples
489///
490/// ```rust
491/// use observable_property_tokio::ObservableProperty;
492/// use std::sync::Arc;
493///
494/// #[tokio::main]
495/// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
496/// let property = ObservableProperty::new("initial".to_string());
497///
498/// let observer_id = property.subscribe(Arc::new(|old, new| {
499/// println!("Changed from '{}' to '{}'", old, new);
500/// }))?;
501///
502/// property.set("updated".to_string())?; // Prints: Changed from 'initial' to 'updated'
503///
504/// // Async version
505/// property.set_async("async update".to_string()).await?;
506///
507/// property.unsubscribe(observer_id)?;
508///
509/// Ok(())
510/// }
511/// ```
512pub struct ObservableProperty<T> {
513 inner: Arc<RwLock<InnerProperty<T>>>,
514 config: PropertyConfig,
515 async_task_semaphore: Arc<tokio::sync::Semaphore>,
516}
517
518struct InnerProperty<T> {
519 value: T,
520 observers: HashMap<ObserverId, Observer<T>>,
521 next_id: usize,
522}
523
524pub struct PropertyHandle<T: Clone + Send + Sync + 'static> {
525 inner: Arc<RwLock<InnerProperty<T>>>,
526}
527
528impl<T: Clone + Send + Sync + 'static> PropertyHandle<T> {
529 /// Removes an observer by its ID, ignoring if it doesn't exist
530 ///
531 /// This is a convenience method that doesn't return an error if the observer doesn't exist.
532 ///
533 /// # Arguments
534 ///
535 /// * `id` - The observer ID returned by `subscribe()`
536 ///
537 /// # Returns
538 ///
539 /// `true` if an observer was removed, `false` if no observer with that ID existed
540 pub fn try_unsubscribe(&self, id: ObserverId) -> bool {
541 let mut inner = self.inner.write();
542 inner.observers.remove(&id).is_some()
543 }
544}
545
546pub struct Subscription<T: Clone + Send + Sync + 'static> {
547 inner: Arc<RwLock<InnerProperty<T>>>,
548 id: ObserverId,
549}
550
551impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
552 fn drop(&mut self) {
553 let mut inner = self.inner.write();
554 inner.observers.remove(&self.id);
555 }
556}
557
558impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
559 /// Creates a new observable property with the given initial value
560 ///
561 /// Uses default configuration with max_observers=1000.
562 /// For custom limits, use `new_with_config()`.
563 ///
564 /// # Arguments
565 ///
566 /// * `initial_value` - The starting value for this property
567 ///
568 /// # Examples
569 ///
570 /// ```rust
571 /// use observable_property_tokio::ObservableProperty;
572 ///
573 /// let property = ObservableProperty::new(42);
574 /// assert_eq!(property.get().unwrap(), 42);
575 /// ```
576 pub fn new(initial_value: T) -> Self {
577 Self::new_with_config(initial_value, PropertyConfig::default())
578 }
579
580 /// Creates a new observable property with custom configuration
581 ///
582 /// This allows you to specify limits on the number of observers and other
583 /// behavioral parameters to prevent resource exhaustion.
584 ///
585 /// # Arguments
586 ///
587 /// * `initial_value` - The starting value for this property
588 /// * `config` - Configuration options for backpressure and resource limits
589 ///
590 /// # Examples
591 ///
592 /// ```rust
593 /// use observable_property_tokio::{ObservableProperty, PropertyConfig};
594 ///
595 /// let config = PropertyConfig {
596 /// max_observers: 50,
597 /// max_pending_notifications: 100,
598 /// observer_timeout_ms: 3000,
599 /// max_concurrent_async_tasks: 50,
600 /// };
601 ///
602 /// let property = ObservableProperty::new_with_config(0, config);
603 /// assert_eq!(property.get().unwrap(), 0);
604 /// ```
605 pub fn new_with_config(initial_value: T, config: PropertyConfig) -> Self {
606 let semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_concurrent_async_tasks));
607
608 Self {
609 inner: Arc::new(RwLock::new(InnerProperty {
610 value: initial_value,
611 observers: HashMap::new(),
612 next_id: 0,
613 })),
614 config,
615 async_task_semaphore: semaphore,
616 }
617 }
618
619 /// Gets the current value of the property
620 ///
621 /// This method acquires a read lock, which allows multiple concurrent readers.
622 ///
623 /// # Returns
624 ///
625 /// `Ok(T)` containing a clone of the current value
626 ///
627 /// # Examples
628 ///
629 /// ```rust
630 /// use observable_property_tokio::ObservableProperty;
631 ///
632 /// let property = ObservableProperty::new("hello".to_string());
633 /// assert_eq!(property.get().unwrap(), "hello");
634 /// ```
635 pub fn get(&self) -> Result<T, PropertyError> {
636 Ok(self.inner.read().value.clone())
637 }
638
639 /// Gets a reference to the current value of the property
640 ///
641 /// This method acquires a read lock and returns a guard that derefs to the value,
642 /// allowing you to read the value without cloning it.
643 ///
644 /// # Returns
645 ///
646 /// A RAII guard that derefs to a reference of the value
647 ///
648 /// # Examples
649 ///
650 /// ```rust
651 /// use observable_property_tokio::ObservableProperty;
652 ///
653 /// let property = ObservableProperty::new("hello".to_string());
654 /// let value_ref = property.get_ref();
655 /// assert_eq!(*value_ref, "hello");
656 /// // Lock is released when value_ref goes out of scope
657 /// ```
658 pub fn get_ref(&self) -> impl std::ops::Deref<Target = T> + '_ {
659 parking_lot::RwLockReadGuard::map(self.inner.read(), |inner| &inner.value)
660 }
661
662 /// Sets the property to a new value and notifies all observers
663 ///
664 /// This method will:
665 /// 1. Acquire a write lock (blocking other readers/writers)
666 /// 2. Update the value and capture a snapshot of observers
667 /// 3. Release the lock
668 /// 4. Notify all observers sequentially with the old and new values
669 ///
670 /// Observer notifications are wrapped in panic recovery to prevent one
671 /// misbehaving observer from affecting others.
672 ///
673 /// # Arguments
674 ///
675 /// * `new_value` - The new value to set
676 ///
677 /// # Returns
678 ///
679 /// `Ok(())` if successful
680 ///
681 /// # Examples
682 ///
683 /// ```rust
684 /// use observable_property_tokio::ObservableProperty;
685 /// use std::sync::Arc;
686 ///
687 /// #[tokio::main]
688 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
689 /// let property = ObservableProperty::new(10);
690 ///
691 /// property.subscribe(Arc::new(|old, new| {
692 /// println!("Value changed from {} to {}", old, new);
693 /// }))?;
694 ///
695 /// property.set(20)?; // Triggers observer notification
696 ///
697 /// Ok(())
698 /// }
699 /// ```
700 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
701 let (old_value, observers_snapshot) = {
702 let mut inner = self.inner.write();
703
704 let old_value = inner.value.clone();
705 inner.value = new_value.clone();
706 let observers_snapshot: Vec<Observer<T>> = inner.observers.values().cloned().collect();
707 (old_value, observers_snapshot)
708 };
709
710 for observer in observers_snapshot {
711 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
712 observer(&old_value, &new_value);
713 })) {
714 eprintln!("Observer panic: {:?}", e);
715 }
716 }
717
718 Ok(())
719 }
720
721 /// Sets the property to a new value and notifies observers asynchronously using Tokio tasks
722 ///
723 /// This method is similar to `set()` but spawns observers in individual Tokio tasks
724 /// for non-blocking operation. This is useful when observers might perform
725 /// time-consuming operations.
726 ///
727 /// # Arguments
728 ///
729 /// * `new_value` - The new value to set
730 ///
731 /// # Returns
732 ///
733 /// `Ok(())` if successful
734 ///
735 /// # Examples
736 ///
737 /// ```rust
738 /// use observable_property_tokio::ObservableProperty;
739 /// use std::sync::Arc;
740 /// use tokio::time::{sleep, Duration};
741 ///
742 /// #[tokio::main]
743 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
744 /// let property = ObservableProperty::new(0);
745 ///
746 /// property.subscribe(Arc::new(move |old, new| {
747 /// // This observer does slow work but won't block the caller
748 /// println!("Slow observer: {} -> {}", old, new);
749 /// }))?;
750 ///
751 /// // This returns immediately even though observer may be slow
752 /// property.set_async(42).await?;
753 ///
754 /// // Give time for observers to run
755 /// sleep(Duration::from_millis(10)).await;
756 ///
757 /// Ok(())
758 /// }
759 /// ```
760 pub async fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
761 let (old_value, observers_snapshot) = {
762 let mut inner = self.inner.write();
763
764 let old_value = inner.value.clone();
765 inner.value = new_value.clone();
766 let observers_snapshot: Vec<Observer<T>> = inner.observers.values().cloned().collect();
767 (old_value, observers_snapshot)
768 };
769
770 if observers_snapshot.is_empty() {
771 return Ok(());
772 }
773
774 // Spawn a separate Tokio task for each observer with semaphore-based connection pooling
775 let mut tasks = Vec::with_capacity(observers_snapshot.len());
776
777 for observer in observers_snapshot {
778 let old_val = old_value.clone();
779 let new_val = new_value.clone();
780 let semaphore = Arc::clone(&self.async_task_semaphore);
781
782 let task = task::spawn(async move {
783 // Acquire permit from semaphore before executing observer
784 let _permit = semaphore.acquire().await.expect("Semaphore closed");
785
786 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
787 observer(&old_val, &new_val);
788 })) {
789 eprintln!("Observer panic in task: {:?}", e);
790 }
791 // Permit is automatically released when _permit is dropped
792 });
793
794 tasks.push(task);
795 }
796
797 // Wait for all tasks to complete to prevent resource leaks
798 for task in tasks {
799 task.await.map_err(|e| PropertyError::JoinError(format!("Task join error: {}", e)))?;
800 }
801
802 Ok(())
803 }
804
805 /// Update the property value using a closure that has access to the current value
806 ///
807 /// This is a more ergonomic way to update a property based on its current value,
808 /// without having to call `get()` and `set()` separately.
809 ///
810 /// # Arguments
811 ///
812 /// * `update_fn` - A function that takes the current value and returns a new value
813 ///
814 /// # Returns
815 ///
816 /// `Ok(())` if successful
817 ///
818 /// # Examples
819 ///
820 /// ```rust
821 /// use observable_property_tokio::ObservableProperty;
822 ///
823 /// let property = ObservableProperty::new(10);
824 ///
825 /// // Double the value
826 /// property.update(|current| current * 2)?;
827 /// assert_eq!(property.get()?, 20);
828 ///
829 /// // Add 5
830 /// property.update(|current| current + 5)?;
831 /// assert_eq!(property.get()?, 25);
832 /// # Ok::<(), observable_property_tokio::PropertyError>(())
833 /// ```
834 pub fn update<F>(&self, update_fn: F) -> Result<(), PropertyError>
835 where
836 F: FnOnce(T) -> T,
837 {
838 let new_value = update_fn(self.get()?);
839 self.set(new_value)
840 }
841
842 /// Update the property value asynchronously using a closure
843 ///
844 /// Like `update()` but uses `set_async()` for the update.
845 ///
846 /// # Arguments
847 ///
848 /// * `update_fn` - A function that takes the current value and returns a new value
849 ///
850 /// # Returns
851 ///
852 /// `Ok(())` if successful
853 ///
854 /// # Examples
855 ///
856 /// ```rust
857 /// use observable_property_tokio::ObservableProperty;
858 ///
859 /// #[tokio::main]
860 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
861 /// let property = ObservableProperty::new("hello".to_string());
862 ///
863 /// property.update_async(|current| format!("{} world", current)).await?;
864 /// assert_eq!(property.get()?, "hello world");
865 ///
866 /// Ok(())
867 /// }
868 /// ```
869 pub async fn update_async<F>(&self, update_fn: F) -> Result<(), PropertyError>
870 where
871 F: FnOnce(T) -> T,
872 {
873 let new_value = update_fn(self.get()?);
874 self.set_async(new_value).await
875 }
876
877 /// Subscribes an observer function to be called when the property changes
878 ///
879 /// The observer function will be called with the old and new values whenever
880 /// the property is modified via `set()` or `set_async()`.
881 ///
882 /// # Arguments
883 ///
884 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
885 ///
886 /// # Returns
887 ///
888 /// `Ok(ObserverId)` containing a unique identifier for this observer
889 ///
890 /// # Examples
891 ///
892 /// ```rust
893 /// use observable_property_tokio::ObservableProperty;
894 /// use std::sync::Arc;
895 ///
896 /// #[tokio::main]
897 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
898 /// let property = ObservableProperty::new(0);
899 ///
900 /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
901 /// println!("Property changed from {} to {}", old_value, new_value);
902 /// }))?;
903 ///
904 /// // Later, unsubscribe using the returned ID
905 /// property.unsubscribe(observer_id)?;
906 ///
907 /// Ok(())
908 /// }
909 /// ```
910 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
911 let mut inner = self.inner.write();
912
913 // Check if we've reached the maximum number of observers
914 if inner.observers.len() >= self.config.max_observers {
915 return Err(PropertyError::CapacityExceeded {
916 current: inner.observers.len(),
917 max: self.config.max_observers,
918 resource: "observers".to_string(),
919 });
920 }
921
922 let id = ObserverId(inner.next_id);
923 inner.next_id += 1;
924 inner.observers.insert(id, observer);
925 Ok(id)
926 }
927
928 /// Removes an observer by its ID
929 ///
930 /// # Arguments
931 ///
932 /// * `id` - The observer ID returned by `subscribe()`
933 ///
934 /// # Returns
935 ///
936 /// `Ok(())` if the observer was removed, or `Err(PropertyError::ObserverNotFound)`
937 /// if no observer with that ID existed.
938 ///
939 /// # Examples
940 ///
941 /// ```rust
942 /// use observable_property_tokio::ObservableProperty;
943 /// use std::sync::Arc;
944 ///
945 /// #[tokio::main]
946 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
947 /// let property = ObservableProperty::new(0);
948 /// let id = property.subscribe(Arc::new(|_, _| {}))?;
949 ///
950 /// // Remove the observer
951 /// property.unsubscribe(id)?;
952 ///
953 /// // Trying to remove again fails with ObserverNotFound
954 /// match property.unsubscribe(id) {
955 /// Err(observable_property_tokio::PropertyError::ObserverNotFound { .. }) => {
956 /// println!("Observer was already removed, as expected");
957 /// }
958 /// _ => panic!("Expected ObserverNotFound error"),
959 /// }
960 ///
961 /// Ok(())
962 /// }
963 /// ```
964 pub fn unsubscribe(&self, id: ObserverId) -> Result<(), PropertyError> {
965 let mut inner = self.inner.write();
966
967 if inner.observers.remove(&id).is_some() {
968 Ok(())
969 } else {
970 Err(PropertyError::ObserverNotFound { id })
971 }
972 }
973
974 /// Removes an observer by its ID, ignoring if it doesn't exist
975 ///
976 /// This is a convenience method that doesn't return an error if the observer doesn't exist.
977 ///
978 /// # Arguments
979 ///
980 /// * `id` - The observer ID returned by `subscribe()`
981 ///
982 /// # Returns
983 ///
984 /// `true` if an observer was removed, `false` if no observer with that ID existed
985 ///
986 /// # Examples
987 ///
988 /// ```rust
989 /// use observable_property_tokio::ObservableProperty;
990 /// use std::sync::Arc;
991 ///
992 /// #[tokio::main]
993 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
994 /// let property = ObservableProperty::new(0);
995 /// let id = property.subscribe(Arc::new(|_, _| {}))?;
996 ///
997 /// // Remove the observer
998 /// assert!(property.try_unsubscribe(id));
999 ///
1000 /// // Trying to remove again just returns false
1001 /// assert!(!property.try_unsubscribe(id));
1002 ///
1003 /// Ok(())
1004 /// }
1005 /// ```
1006 pub fn try_unsubscribe(&self, id: ObserverId) -> bool {
1007 let mut inner = self.inner.write();
1008 inner.observers.remove(&id).is_some()
1009 }
1010
1011 /// Returns the number of active observers for this property
1012 ///
1013 /// # Returns
1014 ///
1015 /// The number of observers currently subscribed to this property
1016 ///
1017 /// # Examples
1018 ///
1019 /// ```rust
1020 /// use observable_property_tokio::ObservableProperty;
1021 /// use std::sync::Arc;
1022 ///
1023 /// #[tokio::main]
1024 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1025 /// let property = ObservableProperty::new(0);
1026 ///
1027 /// assert_eq!(property.observer_count(), 0);
1028 ///
1029 /// let id1 = property.subscribe(Arc::new(|_, _| {}))?;
1030 /// let id2 = property.subscribe(Arc::new(|_, _| {}))?;
1031 ///
1032 /// assert_eq!(property.observer_count(), 2);
1033 ///
1034 /// property.unsubscribe(id1)?;
1035 ///
1036 /// assert_eq!(property.observer_count(), 1);
1037 ///
1038 /// property.unsubscribe(id2)?;
1039 ///
1040 /// assert_eq!(property.observer_count(), 0);
1041 ///
1042 /// Ok(())
1043 /// }
1044 /// ```
1045 pub fn observer_count(&self) -> usize {
1046 self.inner.read().observers.len()
1047 }
1048
1049 /// Subscribes an observer that only gets called when a filter condition is met
1050 ///
1051 /// This is useful for observing only specific types of changes, such as
1052 /// when a value increases or crosses a threshold.
1053 ///
1054 /// # Arguments
1055 ///
1056 /// * `observer` - The observer function to call when the filter passes
1057 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1058 ///
1059 /// # Returns
1060 ///
1061 /// `Ok(ObserverId)` for the filtered observer
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```rust
1066 /// use observable_property_tokio::ObservableProperty;
1067 /// use std::sync::Arc;
1068 ///
1069 /// #[tokio::main]
1070 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1071 /// let property = ObservableProperty::new(0);
1072 ///
1073 /// // Only notify when value increases
1074 /// let id = property.subscribe_filtered(
1075 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
1076 /// |old, new| new > old
1077 /// )?;
1078 ///
1079 /// property.set(10)?; // Triggers observer (0 -> 10)
1080 /// property.set(5)?; // Does NOT trigger observer (10 -> 5)
1081 /// property.set(15)?; // Triggers observer (5 -> 15)
1082 ///
1083 /// Ok(())
1084 /// }
1085 /// ```
1086 pub fn subscribe_filtered<F>(
1087 &self,
1088 observer: Observer<T>,
1089 filter: F,
1090 ) -> Result<ObserverId, PropertyError>
1091 where
1092 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1093 {
1094 let filter = Arc::new(filter);
1095 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1096 if filter(old_val, new_val) {
1097 observer(old_val, new_val);
1098 }
1099 });
1100
1101 self.subscribe(filtered_observer)
1102 }
1103
1104 /// Subscribe with an async handler that will be executed as a Tokio task
1105 ///
1106 /// This version allows you to use async functions as observers. The handler is
1107 /// spawned as a Tokio task whenever the property changes.
1108 ///
1109 /// # Arguments
1110 ///
1111 /// * `handler` - An async function or closure that takes old and new values
1112 ///
1113 /// # Returns
1114 ///
1115 /// `Ok(ObserverId)` containing a unique identifier for this observer
1116 ///
1117 /// # Examples
1118 ///
1119 /// ```rust
1120 /// use observable_property_tokio::ObservableProperty;
1121 /// use std::sync::Arc;
1122 /// use tokio::time::{sleep, Duration};
1123 ///
1124 /// #[tokio::main]
1125 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1126 /// let property = ObservableProperty::new(0);
1127 ///
1128 /// // This handler can perform async operations
1129 /// property.subscribe_async(|old, new| async move {
1130 /// // Simulate some async work
1131 /// sleep(Duration::from_millis(10)).await;
1132 /// println!("Async observer: {} -> {}", old, new);
1133 /// })?;
1134 ///
1135 /// property.set_async(42).await?;
1136 ///
1137 /// // Give time for observers to complete
1138 /// sleep(Duration::from_millis(20)).await;
1139 ///
1140 /// Ok(())
1141 /// }
1142 /// ```
1143 pub fn subscribe_async<F, Fut>(&self, handler: F) -> Result<ObserverId, PropertyError>
1144 where
1145 F: Fn(T, T) -> Fut + Send + Sync + 'static,
1146 Fut: std::future::Future<Output = ()> + Send + 'static,
1147 {
1148 // Wrap the handler in an Arc so we can clone it for each invocation
1149 let handler = Arc::new(handler);
1150 let semaphore = Arc::clone(&self.async_task_semaphore);
1151
1152 let observer = Arc::new(move |old: &T, new: &T| {
1153 let old_val = old.clone();
1154 let new_val = new.clone();
1155 // Clone the handler so we can move it into the task
1156 let handler_clone = Arc::clone(&handler);
1157 let semaphore_clone = Arc::clone(&semaphore);
1158
1159 tokio::spawn(async move {
1160 // Acquire permit from semaphore before executing async handler
1161 let _permit = semaphore_clone.acquire().await.expect("Semaphore closed");
1162 handler_clone(old_val, new_val).await;
1163 // Permit is automatically released when _permit is dropped
1164 });
1165 });
1166
1167 self.subscribe(observer)
1168 }
1169
1170 /// Subscribe with an async handler that is filtered
1171 ///
1172 /// Combines `subscribe_filtered` and `subscribe_async` to provide an async handler
1173 /// that only runs when the filter condition is met.
1174 ///
1175 /// # Arguments
1176 ///
1177 /// * `handler` - An async function or closure that takes old and new values
1178 /// * `filter` - A predicate function that decides if the handler should be called
1179 ///
1180 /// # Returns
1181 ///
1182 /// `Ok(ObserverId)` containing a unique identifier for this observer
1183 ///
1184 /// # Examples
1185 ///
1186 /// ```rust
1187 /// use observable_property_tokio::ObservableProperty;
1188 /// use tokio::time::{sleep, Duration};
1189 ///
1190 /// #[tokio::main]
1191 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1192 /// let property = ObservableProperty::new(0);
1193 ///
1194 /// // This async handler only runs when value increases
1195 /// property.subscribe_async_filtered(
1196 /// |old, new| async move {
1197 /// sleep(Duration::from_millis(10)).await;
1198 /// println!("Value increased: {} -> {}", old, new);
1199 /// },
1200 /// |old, new| new > old
1201 /// )?;
1202 ///
1203 /// property.set(10)?; // Triggers observer (0 -> 10)
1204 /// property.set(5)?; // Does NOT trigger observer (10 -> 5)
1205 /// property.set(15)?; // Triggers observer (5 -> 15)
1206 ///
1207 /// // Give time for observers to complete
1208 /// sleep(Duration::from_millis(20)).await;
1209 ///
1210 /// Ok(())
1211 /// }
1212 /// ```
1213 pub fn subscribe_async_filtered<F, Fut, Filt>(
1214 &self,
1215 handler: F,
1216 filter: Filt,
1217 ) -> Result<ObserverId, PropertyError>
1218 where
1219 F: Fn(T, T) -> Fut + Send + Sync + 'static,
1220 Fut: std::future::Future<Output = ()> + Send + 'static,
1221 Filt: Fn(&T, &T) -> bool + Send + Sync + 'static,
1222 {
1223 let filter = Arc::new(filter);
1224 let handler = Arc::new(handler);
1225 let semaphore = Arc::clone(&self.async_task_semaphore);
1226
1227 let observer = Arc::new(move |old: &T, new: &T| {
1228 if filter(old, new) {
1229 let old_val = old.clone();
1230 let new_val = new.clone();
1231 let handler_clone = Arc::clone(&handler);
1232 let semaphore_clone = Arc::clone(&semaphore);
1233
1234 tokio::spawn(async move {
1235 // Acquire permit from semaphore before executing async handler
1236 let _permit = semaphore_clone.acquire().await.expect("Semaphore closed");
1237 handler_clone(old_val, new_val).await;
1238 // Permit is automatically released when _permit is dropped
1239 });
1240 }
1241 });
1242
1243 self.subscribe(observer)
1244 }
1245
1246 /// Create a new ObservableProperty with transformation applied to the value
1247 ///
1248 /// This creates a derived property that tracks changes to the original property,
1249 /// but with a transformation applied. Changes to the original property are reflected
1250 /// in the derived property, but the derived property is read-only.
1251 ///
1252 /// # Arguments
1253 ///
1254 /// * `transform` - A function that converts from the source type to the target type
1255 ///
1256 /// # Returns
1257 ///
1258 /// `Ok(ObservableProperty<U>)` containing a new property that reflects the transformed value,
1259 /// or `Err(PropertyError)` if the initial value cannot be read or the observer cannot be subscribed
1260 ///
1261 /// # Examples
1262 ///
1263 /// ```rust
1264 /// use observable_property_tokio::ObservableProperty;
1265 /// use std::sync::Arc;
1266 ///
1267 /// #[tokio::main]
1268 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1269 /// let original = ObservableProperty::new(42);
1270 ///
1271 /// // Create a derived property that doubles the value
1272 /// let doubled = original.map(|value| value * 2)?;
1273 ///
1274 /// assert_eq!(doubled.get()?, 84);
1275 ///
1276 /// // When original changes, doubled reflects the transformation
1277 /// original.set(10)?;
1278 /// assert_eq!(doubled.get()?, 20);
1279 ///
1280 /// Ok(())
1281 /// }
1282 /// ```
1283 pub fn map<U, F>(&self, transform: F) -> Result<ObservableProperty<U>, PropertyError>
1284 where
1285 U: Clone + Send + Sync + 'static,
1286 F: Fn(&T) -> U + Send + Sync + 'static,
1287 {
1288 let transform = Arc::new(transform);
1289 let initial_value = transform(&self.get()?);
1290 let derived = ObservableProperty::new(initial_value);
1291
1292 let derived_clone = derived.clone();
1293 self.subscribe(Arc::new(move |_, new_value| {
1294 let transformed = transform(new_value);
1295 if let Err(e) = derived_clone.set(transformed) {
1296 eprintln!("Failed to update derived property: {}", e);
1297 }
1298 }))?;
1299
1300 Ok(derived)
1301 }
1302
1303 /// Removes all registered observers from this property
1304 ///
1305 /// This method clears all observers that have been registered via `subscribe()`,
1306 /// `subscribe_async()`, `subscribe_filtered()`, or `subscribe_async_filtered()`.
1307 /// After calling this method, no observers will be notified of value changes
1308 /// until new ones are registered.
1309 ///
1310 /// This is useful for cleanup scenarios or when you need to reset the observer
1311 /// state without creating a new property instance.
1312 ///
1313 /// # Returns
1314 ///
1315 /// `Ok(())` if successful
1316 ///
1317 /// # Examples
1318 ///
1319 /// ```rust
1320 /// use observable_property_tokio::ObservableProperty;
1321 /// use std::sync::Arc;
1322 ///
1323 /// #[tokio::main]
1324 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1325 /// let property = ObservableProperty::new(42);
1326 ///
1327 /// // Register some observers
1328 /// property.subscribe(Arc::new(|old, new| {
1329 /// println!("Value changed from {} to {}", old, new);
1330 /// }))?;
1331 ///
1332 /// assert_eq!(property.observer_count(), 1);
1333 ///
1334 /// // Clear all observers
1335 /// property.clear_observers()?;
1336 /// assert_eq!(property.observer_count(), 0);
1337 ///
1338 /// // Setting value now won't trigger any observers
1339 /// property.set(100)?;
1340 ///
1341 /// Ok(())
1342 /// }
1343 /// ```
1344 pub fn clear_observers(&self) -> Result<(), PropertyError> {
1345 let mut inner = self.inner.write();
1346 inner.observers.clear();
1347 Ok(())
1348 }
1349
1350 /// Performs cleanup operations on this property
1351 ///
1352 /// This method clears all registered observers, effectively shutting down
1353 /// the property's observer functionality. This is particularly useful in
1354 /// production environments where you need to ensure proper resource cleanup
1355 /// during application shutdown or when disposing of property instances.
1356 ///
1357 /// Currently, this method performs the same operation as `clear_observers()`,
1358 /// but it's provided as a separate method to allow for future expansion
1359 /// of cleanup operations (such as canceling pending async tasks).
1360 ///
1361 /// # Returns
1362 ///
1363 /// `Ok(())` if successful
1364 ///
1365 /// # Examples
1366 ///
1367 /// ```rust
1368 /// use observable_property_tokio::ObservableProperty;
1369 /// use std::sync::Arc;
1370 ///
1371 /// #[tokio::main]
1372 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1373 /// let property = ObservableProperty::new("active".to_string());
1374 ///
1375 /// // Register observers for normal operation
1376 /// property.subscribe(Arc::new(|old, new| {
1377 /// println!("Status changed: {} -> {}", old, new);
1378 /// }))?;
1379 ///
1380 /// // ... normal application usage ...
1381 ///
1382 /// // Shutdown the property when done
1383 /// property.shutdown()?;
1384 ///
1385 /// // Property can still be used for getting/setting values,
1386 /// // but no observers will be notified
1387 /// property.set("inactive".to_string())?;
1388 ///
1389 /// Ok(())
1390 /// }
1391 /// ```
1392 pub fn shutdown(&self) -> Result<(), PropertyError> {
1393 // Cancel any pending async observers
1394 self.clear_observers()
1395 }
1396
1397 /// Shutdown the property with a timeout, waiting for pending async operations
1398 ///
1399 /// This method performs a comprehensive shutdown that:
1400 /// 1. Clears all observers
1401 /// 2. Waits for a grace period to allow pending async operations to complete
1402 /// 3. Returns a detailed report about the shutdown process
1403 ///
1404 /// # Arguments
1405 ///
1406 /// * `timeout` - Maximum duration to wait for shutdown to complete
1407 ///
1408 /// # Returns
1409 ///
1410 /// `Ok(ShutdownReport)` containing shutdown metrics and diagnostics
1411 ///
1412 /// # Examples
1413 ///
1414 /// ```
1415 /// use observable_property_tokio::ObservableProperty;
1416 /// use std::sync::Arc;
1417 /// use std::time::Duration;
1418 ///
1419 /// #[tokio::main]
1420 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1421 /// let property = ObservableProperty::new(0);
1422 ///
1423 /// // Add some async observers
1424 /// property.subscribe_async(|_, new| async move {
1425 /// println!("Async observer: {}", new);
1426 /// })?;
1427 ///
1428 /// property.subscribe(Arc::new(|_, new| {
1429 /// println!("Sync observer: {}", new);
1430 /// }))?;
1431 ///
1432 /// // ... use property ...
1433 ///
1434 /// // Graceful shutdown with timeout
1435 /// let report = property.shutdown_with_timeout(Duration::from_secs(5)).await?;
1436 ///
1437 /// println!("Shutdown report: {}", report.diagnostic_info());
1438 /// println!("Cleared {} observers in {:?}",
1439 /// report.observers_cleared,
1440 /// report.shutdown_duration);
1441 ///
1442 /// Ok(())
1443 /// }
1444 /// ```
1445 pub async fn shutdown_with_timeout(
1446 &self,
1447 timeout: std::time::Duration,
1448 ) -> Result<ShutdownReport, PropertyError> {
1449 use std::time::{SystemTime, UNIX_EPOCH, Instant};
1450
1451 let start = Instant::now();
1452 let initiated_at_ms = SystemTime::now()
1453 .duration_since(UNIX_EPOCH)
1454 .map(|d| d.as_millis() as u64)
1455 .unwrap_or(0);
1456
1457 // Get initial observer count before clearing
1458 let initial_count = self.observer_count();
1459
1460 // Clear all observers
1461 self.clear_observers()?;
1462
1463 // Wait for pending notifications with timeout
1464 let grace_period = timeout.min(std::time::Duration::from_millis(500));
1465 let completed_within_timeout = tokio::time::timeout(
1466 grace_period,
1467 tokio::time::sleep(grace_period)
1468 ).await.is_ok();
1469
1470 let shutdown_duration = start.elapsed();
1471
1472 Ok(ShutdownReport {
1473 observers_cleared: initial_count,
1474 shutdown_duration,
1475 completed_within_timeout,
1476 initiated_at_ms,
1477 })
1478 }
1479
1480 pub fn subscribe_with_token(&self, observer: Observer<T>) -> Result<Subscription<T>, PropertyError> {
1481 let id = self.subscribe(observer)?;
1482 Ok(Subscription {
1483 inner: Arc::clone(&self.inner),
1484 id
1485 })
1486 }
1487
1488 pub fn subscribe_filtered_with_token<F>(
1489 &self,
1490 observer: Observer<T>,
1491 filter: F,
1492 ) -> Result<Subscription<T>, PropertyError>
1493 where
1494 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1495 {
1496 let id = self.subscribe_filtered(observer, filter)?;
1497 Ok(Subscription {
1498 inner: Arc::clone(&self.inner),
1499 id
1500 })
1501 }
1502
1503 pub fn subscribe_async_with_token<F, Fut>(&self, handler: F) -> Result<Subscription<T>, PropertyError>
1504 where
1505 F: Fn(T, T) -> Fut + Send + Sync + 'static,
1506 Fut: std::future::Future<Output = ()> + Send + 'static,
1507 {
1508 let id = self.subscribe_async(handler)?;
1509 Ok(Subscription {
1510 inner: Arc::clone(&self.inner),
1511 id
1512 })
1513 }
1514
1515 pub fn subscribe_async_filtered_with_token<F, Fut, Filt>(
1516 &self,
1517 handler: F,
1518 filter: Filt,
1519 ) -> Result<Subscription<T>, PropertyError>
1520 where
1521 F: Fn(T, T) -> Fut + Send + Sync + 'static,
1522 Fut: std::future::Future<Output = ()> + Send + 'static,
1523 Filt: Fn(&T, &T) -> bool + Send + Sync + 'static,
1524 {
1525 let id = self.subscribe_async_filtered(handler, filter)?;
1526 Ok(Subscription {
1527 inner: Arc::clone(&self.inner),
1528 id
1529 })
1530 }
1531
1532}
1533
1534impl<T: Clone + Send + Sync + 'static + Default> Default for ObservableProperty<T> {
1535 /// Creates a new ObservableProperty with the default value for type T
1536 ///
1537 /// # Examples
1538 ///
1539 /// ```rust
1540 /// use observable_property_tokio::ObservableProperty;
1541 ///
1542 /// let property: ObservableProperty<i32> = Default::default();
1543 /// assert_eq!(property.get().unwrap(), 0); // Default for i32 is 0
1544 /// ```
1545 fn default() -> Self {
1546 Self::new(T::default())
1547 }
1548}
1549
1550impl<T: Clone> Clone for ObservableProperty<T> {
1551 /// Creates a new reference to the same observable property
1552 ///
1553 /// This creates a new `ObservableProperty` instance that shares the same
1554 /// underlying data with the original. Changes made through either instance
1555 /// will be visible to observers subscribed through both instances.
1556 ///
1557 /// # Examples
1558 ///
1559 /// ```rust
1560 /// use observable_property_tokio::ObservableProperty;
1561 /// use std::sync::Arc;
1562 ///
1563 /// #[tokio::main]
1564 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1565 /// let property1 = ObservableProperty::new(42);
1566 /// let property2 = property1.clone();
1567 ///
1568 /// property2.subscribe(Arc::new(|old, new| {
1569 /// println!("Observer on property2 saw change: {} -> {}", old, new);
1570 /// }))?;
1571 ///
1572 /// // This change through property1 will trigger the observer on property2
1573 /// property1.set(100)?;
1574 ///
1575 /// Ok(())
1576 /// }
1577 /// ```
1578 fn clone(&self) -> Self {
1579 Self {
1580 inner: Arc::clone(&self.inner),
1581 config: self.config.clone(),
1582 async_task_semaphore: Arc::clone(&self.async_task_semaphore),
1583 }
1584 }
1585}
1586
1587impl<T: Clone + Send + Sync + 'static> Clone for PropertyHandle<T> {
1588 fn clone(&self) -> Self {
1589 Self {
1590 inner: Arc::clone(&self.inner),
1591 }
1592 }
1593}
1594
1595impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1596 /// Debug implementation that shows the current value if accessible
1597 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1598 match self.get() {
1599 Ok(value) => f.debug_struct("ObservableProperty")
1600 .field("value", &value)
1601 .field("observers_count", &self.observer_count())
1602 .finish(),
1603 Err(_) => f.debug_struct("ObservableProperty")
1604 .field("value", &"[inaccessible]")
1605 .field("observers_count", &self.observer_count())
1606 .finish(),
1607 }
1608 }
1609}
1610
1611#[cfg(test)]
1612mod tests {
1613 use super::*;
1614 use std::sync::atomic::{AtomicUsize, Ordering};
1615 use std::time::Duration;
1616 use tokio::time::sleep;
1617
1618 // Basic tests
1619 #[tokio::test]
1620 async fn test_new_and_get() -> Result<(), PropertyError> {
1621 let property = ObservableProperty::new(42);
1622 assert_eq!(property.get()?, 42);
1623 Ok(())
1624 }
1625
1626 #[tokio::test]
1627 async fn test_default() {
1628 let property: ObservableProperty<String> = Default::default();
1629 assert_eq!(property.get().unwrap(), String::default());
1630 }
1631
1632 #[tokio::test]
1633 async fn test_get_ref() {
1634 let property = ObservableProperty::new("hello".to_string());
1635 let value_ref = property.get_ref();
1636 assert_eq!(*value_ref, "hello");
1637 }
1638
1639 #[tokio::test]
1640 async fn test_set() -> Result<(), PropertyError> {
1641 let property = ObservableProperty::new(10);
1642 property.set(20)?;
1643 assert_eq!(property.get()?, 20);
1644 Ok(())
1645 }
1646
1647 #[tokio::test]
1648 async fn test_update() -> Result<(), PropertyError> {
1649 let property = ObservableProperty::new(10);
1650 property.update(|val| val * 2)?;
1651 assert_eq!(property.get()?, 20);
1652 Ok(())
1653 }
1654
1655 #[tokio::test]
1656 async fn test_update_async() -> Result<(), PropertyError> {
1657 let property = ObservableProperty::new(10);
1658 property.update_async(|val| val * 2).await?;
1659 assert_eq!(property.get()?, 20);
1660 Ok(())
1661 }
1662
1663 #[tokio::test]
1664 async fn test_map() -> Result<(), PropertyError> {
1665 let property = ObservableProperty::new(10);
1666 let derived = property.map(|val| val.to_string())?;
1667
1668 assert_eq!(derived.get()?, "10");
1669
1670 property.set(20)?;
1671 assert_eq!(derived.get()?, "20");
1672 Ok(())
1673 }
1674
1675 #[tokio::test]
1676 async fn test_set_async() -> Result<(), PropertyError> {
1677 let property = ObservableProperty::new("hello".to_string());
1678 property.set_async("world".to_string()).await?;
1679 assert_eq!(property.get()?, "world");
1680 Ok(())
1681 }
1682
1683 #[tokio::test]
1684 async fn test_clone() -> Result<(), PropertyError> {
1685 let property1 = ObservableProperty::new(100);
1686 let property2 = property1.clone();
1687
1688 // Change through property2
1689 property2.set(200)?;
1690
1691 // Both should reflect the change
1692 assert_eq!(property1.get()?, 200);
1693 assert_eq!(property2.get()?, 200);
1694 Ok(())
1695 }
1696
1697 // Observer tests
1698 #[tokio::test]
1699 async fn test_subscribe_and_notify() -> Result<(), PropertyError> {
1700 let property = ObservableProperty::new(0);
1701 let counter = Arc::new(AtomicUsize::new(0));
1702 let counter_clone = counter.clone();
1703
1704 property.subscribe(Arc::new(move |_, _| {
1705 counter_clone.fetch_add(1, Ordering::SeqCst);
1706 }))?;
1707
1708 property.set(1)?;
1709 property.set(2)?;
1710 property.set(3)?;
1711
1712 assert_eq!(counter.load(Ordering::SeqCst), 3);
1713 Ok(())
1714 }
1715
1716 #[tokio::test]
1717 async fn test_observer_count() -> Result<(), PropertyError> {
1718 let property = ObservableProperty::new(0);
1719 assert_eq!(property.observer_count(), 0);
1720
1721 let id1 = property.subscribe(Arc::new(|_, _| {}))?;
1722 let id2 = property.subscribe(Arc::new(|_, _| {}))?;
1723 assert_eq!(property.observer_count(), 2);
1724
1725 property.unsubscribe(id1)?;
1726 assert_eq!(property.observer_count(), 1);
1727
1728 property.unsubscribe(id2)?;
1729 assert_eq!(property.observer_count(), 0);
1730
1731 Ok(())
1732 }
1733
1734 #[tokio::test]
1735 async fn test_try_unsubscribe() -> Result<(), PropertyError> {
1736 let property = ObservableProperty::new(0);
1737 let id = property.subscribe(Arc::new(|_, _| {}))?;
1738
1739 assert!(property.try_unsubscribe(id));
1740 assert!(!property.try_unsubscribe(id));
1741
1742 Ok(())
1743 }
1744
1745 #[tokio::test]
1746 async fn test_subscribe_async() -> Result<(), PropertyError> {
1747 let property = ObservableProperty::new(0);
1748 let counter = Arc::new(AtomicUsize::new(0));
1749 let counter_clone = counter.clone();
1750
1751 property.subscribe_async(move |_, _| {
1752 let counter = counter_clone.clone();
1753 async move {
1754 // Simulate async work
1755 sleep(Duration::from_millis(10)).await;
1756 counter.fetch_add(1, Ordering::SeqCst);
1757 }
1758 })?;
1759
1760 property.set_async(1).await?;
1761 property.set_async(2).await?;
1762
1763 // Give time for async operations to complete
1764 sleep(Duration::from_millis(50)).await;
1765
1766 // Check counter after async operations complete
1767 assert_eq!(counter.load(Ordering::SeqCst), 2);
1768 Ok(())
1769 }
1770
1771 #[tokio::test]
1772 async fn test_subscribe_async_filtered() -> Result<(), PropertyError> {
1773 let property = ObservableProperty::new(0);
1774 let counter = Arc::new(AtomicUsize::new(0));
1775 let counter_clone = counter.clone();
1776
1777 property.subscribe_async_filtered(
1778 move |_, _| {
1779 let counter = counter_clone.clone();
1780 async move {
1781 sleep(Duration::from_millis(10)).await;
1782 counter.fetch_add(1, Ordering::SeqCst);
1783 }
1784 },
1785 |old, new| new > old
1786 )?;
1787
1788 property.set_async(10).await?; // Should trigger (0 -> 10)
1789 property.set_async(5).await?; // Should NOT trigger (10 -> 5)
1790 property.set_async(15).await?; // Should trigger (5 -> 15)
1791
1792 // Give time for async operations to complete
1793 sleep(Duration::from_millis(50)).await;
1794
1795 // Only two updates should have triggered the observer
1796 assert_eq!(counter.load(Ordering::SeqCst), 2);
1797 Ok(())
1798 }
1799
1800 #[tokio::test]
1801 async fn test_multiple_observers() -> Result<(), PropertyError> {
1802 let property = ObservableProperty::new(0);
1803 let counter1 = Arc::new(AtomicUsize::new(0));
1804 let counter2 = Arc::new(AtomicUsize::new(0));
1805
1806 property.subscribe(Arc::new({
1807 let counter = counter1.clone();
1808 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1809 }))?;
1810
1811 property.subscribe(Arc::new({
1812 let counter = counter2.clone();
1813 move |_, _| { counter.fetch_add(2, Ordering::SeqCst); }
1814 }))?;
1815
1816 property.set(42)?;
1817
1818 assert_eq!(counter1.load(Ordering::SeqCst), 1);
1819 assert_eq!(counter2.load(Ordering::SeqCst), 2);
1820 Ok(())
1821 }
1822
1823 #[tokio::test]
1824 async fn test_unsubscribe() -> Result<(), PropertyError> {
1825 let property = ObservableProperty::new(0);
1826 let counter = Arc::new(AtomicUsize::new(0));
1827
1828 let id = property.subscribe(Arc::new({
1829 let counter = counter.clone();
1830 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1831 }))?;
1832
1833 property.set(1)?;
1834 assert_eq!(counter.load(Ordering::SeqCst), 1);
1835
1836 // Unsubscribe and verify it no longer receives updates
1837 property.unsubscribe(id)?;
1838
1839 // Set again, counter should not increase
1840 property.set(2)?;
1841 assert_eq!(counter.load(Ordering::SeqCst), 1);
1842
1843 // Try to unsubscribe again, should fail with ObserverNotFound
1844 match property.unsubscribe(id) {
1845 Err(PropertyError::ObserverNotFound { .. }) => {},
1846 other => panic!("Expected ObserverNotFound error, got {:?}", other),
1847 }
1848
1849 Ok(())
1850 }
1851
1852 // Filtered observer tests
1853 #[tokio::test]
1854 async fn test_filtered_observer() -> Result<(), PropertyError> {
1855 let property = ObservableProperty::new(0);
1856 let counter = Arc::new(AtomicUsize::new(0));
1857
1858 property.subscribe_filtered(
1859 Arc::new({
1860 let counter = counter.clone();
1861 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1862 }),
1863 |old, new| new > old // Only trigger when value increases
1864 )?;
1865
1866 property.set(10)?; // Should trigger (0 -> 10)
1867 property.set(5)?; // Should NOT trigger (10 -> 5)
1868 property.set(15)?; // Should trigger (5 -> 15)
1869
1870 assert_eq!(counter.load(Ordering::SeqCst), 2);
1871 Ok(())
1872 }
1873
1874 // Concurrent access tests
1875 #[tokio::test]
1876 async fn test_concurrent_modifications() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1877 let property = Arc::new(ObservableProperty::new(0));
1878 let final_counter = Arc::new(AtomicUsize::new(0));
1879
1880 // Subscribe to track the final value
1881 property.subscribe(Arc::new({
1882 let counter = final_counter.clone();
1883 move |_, new| {
1884 counter.store(*new, Ordering::SeqCst);
1885 }
1886 }))?;
1887
1888 // Create multiple tasks to update the property concurrently
1889 let mut tasks = vec![];
1890
1891 for i in 1..=5 {
1892 let prop = property.clone();
1893 let task = tokio::spawn(async move {
1894 prop.set(i).map_err(|e| format!("Failed to set property: {}", e))
1895 });
1896 tasks.push(task);
1897 }
1898
1899 // Wait for all tasks to complete
1900 for task in tasks {
1901 task.await??;
1902 }
1903
1904 // Final value should be one of the set values (1-5)
1905 let final_value = final_counter.load(Ordering::SeqCst);
1906 assert!(final_value >= 1 && final_value <= 5);
1907 Ok(())
1908 }
1909
1910 // Test for observer panic handling
1911 #[tokio::test]
1912 async fn test_observer_panic_handling() -> Result<(), PropertyError> {
1913 let property = ObservableProperty::new(0);
1914 let counter = Arc::new(AtomicUsize::new(0));
1915
1916 // First observer panics
1917 property.subscribe(Arc::new(|_, _| {
1918 panic!("This observer intentionally panics");
1919 }))?;
1920
1921 // Second observer should still run
1922 property.subscribe(Arc::new({
1923 let counter = counter.clone();
1924 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1925 }))?;
1926
1927 // This should not panic the test
1928 property.set(42)?;
1929
1930 // Second observer should have run
1931 assert_eq!(counter.load(Ordering::SeqCst), 1);
1932 Ok(())
1933 }
1934
1935 // More tests for new functionality
1936 #[tokio::test]
1937 async fn test_async_observers_with_async_set() -> Result<(), PropertyError> {
1938 let property = ObservableProperty::new(0);
1939 let counter = Arc::new(AtomicUsize::new(0));
1940
1941 // Register two types of observers - one sync, one async
1942 property.subscribe(Arc::new({
1943 let counter = counter.clone();
1944 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1945 }))?;
1946
1947 let counter_clone = counter.clone();
1948 property.subscribe_async(move |_, _| {
1949 let counter = counter_clone.clone();
1950 async move {
1951 sleep(Duration::from_millis(10)).await;
1952 counter.fetch_add(1, Ordering::SeqCst);
1953 }
1954 })?;
1955
1956 // Using set_async should notify both observers
1957 property.set_async(42).await?;
1958
1959 // Give time for async observer to complete
1960 sleep(Duration::from_millis(50)).await;
1961
1962 // Both observers should have incremented the counter
1963 assert_eq!(counter.load(Ordering::SeqCst), 2);
1964 Ok(())
1965 }
1966
1967 // Test for stress with many observers
1968 #[tokio::test]
1969 async fn test_many_observers() -> Result<(), PropertyError> {
1970 let property = ObservableProperty::new(0);
1971 let counter = Arc::new(AtomicUsize::new(0));
1972
1973 // Add 100 observers
1974 for _ in 0..100 {
1975 property.subscribe(Arc::new({
1976 let counter = counter.clone();
1977 move |_, _| {
1978 counter.fetch_add(1, Ordering::SeqCst);
1979 }
1980 }))?;
1981 }
1982
1983 // Trigger all observers
1984 property.set_async(999).await?;
1985
1986 // Wait for all to complete
1987 sleep(Duration::from_millis(100)).await;
1988
1989 // All 100 observers should have incremented the counter
1990 assert_eq!(counter.load(Ordering::SeqCst), 100);
1991 Ok(())
1992 }
1993
1994 // Test for correct old and new values in observers
1995 #[tokio::test]
1996 async fn test_observer_receives_correct_values() -> Result<(), PropertyError> {
1997 let property = ObservableProperty::new(100);
1998 let vals = Arc::new((AtomicUsize::new(0), AtomicUsize::new(0)));
1999
2000 property.subscribe(Arc::new({
2001 let vals = vals.clone();
2002 move |old, new| {
2003 vals.0.store(*old, Ordering::SeqCst);
2004 vals.1.store(*new, Ordering::SeqCst);
2005 }
2006 }))?;
2007
2008 property.set(200)?;
2009
2010 assert_eq!(vals.0.load(Ordering::SeqCst), 100);
2011 assert_eq!(vals.1.load(Ordering::SeqCst), 200);
2012 Ok(())
2013 }
2014
2015 // Test for complex data type
2016 #[derive(Debug, Clone, PartialEq)]
2017 struct Person {
2018 name: String,
2019 age: u32,
2020 }
2021
2022 #[tokio::test]
2023 async fn test_complex_data_type() -> Result<(), PropertyError> {
2024 let person1 = Person {
2025 name: "Alice".to_string(),
2026 age: 30,
2027 };
2028
2029 let person2 = Person {
2030 name: "Bob".to_string(),
2031 age: 25,
2032 };
2033
2034 let property = ObservableProperty::new(person1.clone());
2035 assert_eq!(property.get()?, person1);
2036
2037 let name_changes = Arc::new(AtomicUsize::new(0));
2038
2039 property.subscribe_filtered(
2040 Arc::new({
2041 let counter = name_changes.clone();
2042 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2043 }),
2044 |old, new| old.name != new.name // Only notify on name changes
2045 )?;
2046
2047 // Update age only - shouldn't trigger
2048 let mut person3 = person1.clone();
2049 person3.age = 31;
2050 property.set(person3)?;
2051 assert_eq!(name_changes.load(Ordering::SeqCst), 0);
2052
2053 // Update name - should trigger
2054 property.set(person2)?;
2055 assert_eq!(name_changes.load(Ordering::SeqCst), 1);
2056 Ok(())
2057 }
2058
2059 // Test waiting for observers with proper async handling
2060 #[tokio::test]
2061 async fn test_waiting_for_observers() -> Result<(), PropertyError> {
2062 let property = ObservableProperty::new(0);
2063 let counter = Arc::new(AtomicUsize::new(0));
2064
2065 // Use subscribe_async instead of manually spawning tasks
2066 let counter_for_observer = counter.clone();
2067 property.subscribe_async(move |_, _| {
2068 let counter = counter_for_observer.clone();
2069 async move {
2070 sleep(Duration::from_millis(50)).await;
2071 counter.fetch_add(1, Ordering::SeqCst);
2072 }
2073 })?;
2074
2075 // Use the regular set_async method
2076 property.set_async(42).await?;
2077
2078 // Give sufficient time for async observers to complete
2079 sleep(Duration::from_millis(100)).await;
2080
2081 // Counter should be incremented after the async work completes
2082 assert_eq!(counter.load(Ordering::SeqCst), 1);
2083 Ok(())
2084 }
2085
2086 #[tokio::test]
2087 async fn test_subscription_auto_cleanup() -> Result<(), PropertyError> {
2088 let property = ObservableProperty::new(0);
2089 let counter = Arc::new(AtomicUsize::new(0));
2090
2091 {
2092 // Create a subscription in this scope
2093 let _subscription = property.subscribe_with_token(Arc::new({
2094 let counter = counter.clone();
2095 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2096 }))?;
2097
2098 // Update should trigger the observer
2099 property.set(1)?;
2100 assert_eq!(counter.load(Ordering::SeqCst), 1);
2101
2102 // Subscription is still active within this scope
2103 property.set(2)?;
2104 assert_eq!(counter.load(Ordering::SeqCst), 2);
2105 } // _subscription is dropped here, should automatically unsubscribe
2106
2107 // After subscription is dropped, updates should not trigger the observer
2108 property.set(3)?;
2109 assert_eq!(counter.load(Ordering::SeqCst), 2); // Counter should not increment
2110
2111 Ok(())
2112 }
2113
2114 #[tokio::test]
2115 async fn test_filtered_subscription_auto_cleanup() -> Result<(), PropertyError> {
2116 let property = ObservableProperty::new(0);
2117 let counter = Arc::new(AtomicUsize::new(0));
2118
2119 // Only notify when value increases
2120 let filter = |old: &i32, new: &i32| new > old;
2121
2122 {
2123 // Create a filtered subscription in this scope
2124 let _subscription = property.subscribe_filtered_with_token(
2125 Arc::new({
2126 let counter = counter.clone();
2127 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2128 }),
2129 filter
2130 )?;
2131
2132 property.set(10)?; // Should trigger (0 -> 10)
2133 assert_eq!(counter.load(Ordering::SeqCst), 1);
2134
2135 property.set(5)?; // Should NOT trigger (10 -> 5)
2136 assert_eq!(counter.load(Ordering::SeqCst), 1);
2137
2138 property.set(15)?; // Should trigger (5 -> 15)
2139 assert_eq!(counter.load(Ordering::SeqCst), 2);
2140 } // _subscription is dropped here, should automatically unsubscribe
2141
2142 // After subscription is dropped, updates should not trigger the observer
2143 property.set(20)?;
2144 assert_eq!(counter.load(Ordering::SeqCst), 2); // Counter should not increment
2145
2146 Ok(())
2147 }
2148
2149 #[tokio::test]
2150 async fn test_async_subscription_auto_cleanup() -> Result<(), PropertyError> {
2151 let property = ObservableProperty::new(0);
2152 let counter = Arc::new(AtomicUsize::new(0));
2153 let counter_clone = counter.clone(); // Clone before passing to closure
2154
2155 {
2156 // Create an async subscription in this scope
2157 let _subscription = property.subscribe_async_with_token(move |_, _| {
2158 let counter = counter_clone.clone(); // Use counter_clone instead of counter
2159 async move {
2160 sleep(Duration::from_millis(10)).await;
2161 counter.fetch_add(1, Ordering::SeqCst);
2162 }
2163 })?;
2164
2165 property.set_async(1).await?;
2166
2167 // Give time for async operations to complete
2168 sleep(Duration::from_millis(50)).await;
2169 assert_eq!(counter.load(Ordering::SeqCst), 1);
2170 } // _subscription is dropped here, should automatically unsubscribe
2171
2172 // After subscription is dropped, updates should not trigger the observer
2173 property.set_async(2).await?;
2174
2175 // Give time for any potential async operations to complete
2176 sleep(Duration::from_millis(50)).await;
2177 assert_eq!(counter.load(Ordering::SeqCst), 1); // Counter should not increment
2178
2179 Ok(())
2180 }
2181
2182 #[tokio::test]
2183 async fn test_async_filtered_subscription_auto_cleanup() -> Result<(), PropertyError> {
2184 let property = ObservableProperty::new(0);
2185 let counter = Arc::new(AtomicUsize::new(0));
2186 let counter_clone = counter.clone(); // Clone before passing to closure
2187
2188 {
2189 // Create an async filtered subscription in this scope
2190 let _subscription = property.subscribe_async_filtered_with_token(
2191 move |_, _| {
2192 let counter = counter_clone.clone(); // Use counter_clone instead of counter
2193 async move {
2194 sleep(Duration::from_millis(10)).await;
2195 counter.fetch_add(1, Ordering::SeqCst);
2196 }
2197 },
2198 |old, new| new > old // Only trigger when value increases
2199 )?;
2200
2201 property.set_async(10).await?; // Should trigger (0 -> 10)
2202
2203 // Give time for async operations to complete
2204 sleep(Duration::from_millis(50)).await;
2205 assert_eq!(counter.load(Ordering::SeqCst), 1);
2206
2207 property.set_async(5).await?; // Should NOT trigger (10 -> 5)
2208 sleep(Duration::from_millis(50)).await;
2209 assert_eq!(counter.load(Ordering::SeqCst), 1);
2210 } // _subscription is dropped here, should automatically unsubscribe
2211
2212 // After subscription is dropped, updates should not trigger the observer
2213 property.set_async(15).await?; // Would have triggered with active subscription
2214
2215 // Give time for any potential async operations to complete
2216 sleep(Duration::from_millis(50)).await;
2217 assert_eq!(counter.load(Ordering::SeqCst), 1); // Counter should not increment
2218
2219 Ok(())
2220 }
2221
2222 #[tokio::test]
2223 async fn test_multiple_subscriptions() -> Result<(), PropertyError> {
2224 let property = ObservableProperty::new(0);
2225 let counter1 = Arc::new(AtomicUsize::new(0));
2226 let counter2 = Arc::new(AtomicUsize::new(0));
2227
2228 // First subscription
2229 let subscription1 = property.subscribe_with_token(Arc::new({
2230 let counter = counter1.clone();
2231 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2232 }))?;
2233
2234 // Second subscription
2235 let subscription2 = property.subscribe_with_token(Arc::new({
2236 let counter = counter2.clone();
2237 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2238 }))?;
2239
2240 // Both subscriptions should receive updates
2241 property.set(1)?;
2242 assert_eq!(counter1.load(Ordering::SeqCst), 1);
2243 assert_eq!(counter2.load(Ordering::SeqCst), 1);
2244
2245 // Drop first subscription only
2246 drop(subscription1);
2247
2248 // Only the second subscription should receive updates now
2249 property.set(2)?;
2250 assert_eq!(counter1.load(Ordering::SeqCst), 1); // Should not increment
2251 assert_eq!(counter2.load(Ordering::SeqCst), 2); // Should increment
2252
2253 // Drop second subscription
2254 drop(subscription2);
2255
2256 // No subscriptions should receive updates now
2257 property.set(3)?;
2258 assert_eq!(counter1.load(Ordering::SeqCst), 1);
2259 assert_eq!(counter2.load(Ordering::SeqCst), 2);
2260
2261 Ok(())
2262 }
2263
2264 #[tokio::test]
2265 async fn test_subscription_with_property_drop() -> Result<(), PropertyError> {
2266 // Create property in a scope so it can be dropped
2267 let counter = Arc::new(AtomicUsize::new(0));
2268 let subscription;
2269
2270 {
2271 let property = ObservableProperty::new(0);
2272
2273 // Create subscription
2274 subscription = property.subscribe_with_token(Arc::new({
2275 let counter = counter.clone();
2276 move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2277 }))?;
2278
2279 // Subscription works normally
2280 property.set(1)?;
2281 assert_eq!(counter.load(Ordering::SeqCst), 1);
2282 } // property is dropped here, but subscription is still alive
2283
2284 // Subscription should be aware that property is gone when we drop it
2285 // This should not panic or cause any issues
2286 drop(subscription);
2287
2288 Ok(())
2289 }
2290
2291 // Test cleanup methods
2292 #[tokio::test]
2293 async fn test_cleanup_methods() -> Result<(), PropertyError> {
2294 let property = ObservableProperty::new(42);
2295 let counter = Arc::new(AtomicUsize::new(0));
2296
2297 // Subscribe multiple observers
2298 let counter1 = counter.clone();
2299 property.subscribe(Arc::new(move |_, _| {
2300 counter1.fetch_add(1, Ordering::SeqCst);
2301 }))?;
2302
2303 let counter2 = counter.clone();
2304 property.subscribe_async(move |_, _| {
2305 let counter = counter2.clone();
2306 async move {
2307 counter.fetch_add(1, Ordering::SeqCst);
2308 }
2309 })?;
2310
2311 assert_eq!(property.observer_count(), 2);
2312
2313 // Test clear_observers
2314 property.clear_observers()?;
2315 assert_eq!(property.observer_count(), 0);
2316
2317 // Setting value should not trigger any observers
2318 property.set(100)?;
2319 assert_eq!(counter.load(Ordering::SeqCst), 0);
2320
2321 // Re-subscribe to test shutdown
2322 let counter3 = counter.clone();
2323 property.subscribe(Arc::new(move |_, _| {
2324 counter3.fetch_add(1, Ordering::SeqCst);
2325 }))?;
2326
2327 assert_eq!(property.observer_count(), 1);
2328
2329 // Test shutdown method
2330 property.shutdown()?;
2331 assert_eq!(property.observer_count(), 0);
2332
2333 // Setting value should not trigger any observers after shutdown
2334 property.set(200)?;
2335 assert_eq!(counter.load(Ordering::SeqCst), 0);
2336
2337 Ok(())
2338 }
2339
2340 // Test backpressure and configuration functionality
2341 #[tokio::test]
2342 async fn test_property_config_default() {
2343 let config = PropertyConfig::default();
2344 assert_eq!(config.max_observers, 1000);
2345 assert_eq!(config.max_pending_notifications, 100);
2346 assert_eq!(config.observer_timeout_ms, 5000);
2347 }
2348
2349 #[tokio::test]
2350 async fn test_property_with_custom_config() -> Result<(), PropertyError> {
2351 let config = PropertyConfig {
2352 max_observers: 5,
2353 max_pending_notifications: 10,
2354 observer_timeout_ms: 1000,
2355 max_concurrent_async_tasks: 100,
2356 };
2357
2358 let property = ObservableProperty::new_with_config(42, config);
2359 assert_eq!(property.get()?, 42);
2360
2361 // Should be able to add up to max_observers
2362 for i in 0..5 {
2363 property.subscribe(Arc::new(move |_, _| {
2364 println!("Observer {}", i);
2365 }))?;
2366 }
2367
2368 assert_eq!(property.observer_count(), 5);
2369 Ok(())
2370 }
2371
2372 #[tokio::test]
2373 async fn test_observer_capacity_limit() -> Result<(), PropertyError> {
2374 let config = PropertyConfig {
2375 max_observers: 3,
2376 max_pending_notifications: 100,
2377 observer_timeout_ms: 5000,
2378 max_concurrent_async_tasks: 100,
2379 };
2380
2381 let property = ObservableProperty::new_with_config(0, config);
2382
2383 // Add observers up to limit
2384 property.subscribe(Arc::new(|_, _| {}))?;
2385 property.subscribe(Arc::new(|_, _| {}))?;
2386 property.subscribe(Arc::new(|_, _| {}))?;
2387
2388 assert_eq!(property.observer_count(), 3);
2389
2390 // Next subscribe should fail with CapacityExceeded
2391 let result = property.subscribe(Arc::new(|_, _| {}));
2392 assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2393
2394 if let Err(PropertyError::CapacityExceeded { current, max, resource }) = result {
2395 assert_eq!(current, 3);
2396 assert_eq!(max, 3);
2397 assert_eq!(resource, "observers");
2398 }
2399
2400 Ok(())
2401 }
2402
2403 #[tokio::test]
2404 async fn test_observer_capacity_after_unsubscribe() -> Result<(), PropertyError> {
2405 let config = PropertyConfig {
2406 max_observers: 2,
2407 max_pending_notifications: 100,
2408 observer_timeout_ms: 5000,
2409 max_concurrent_async_tasks: 100,
2410 };
2411
2412 let property = ObservableProperty::new_with_config(0, config);
2413
2414 // Add observers up to limit
2415 let id1 = property.subscribe(Arc::new(|_, _| {}))?;
2416 let _id2 = property.subscribe(Arc::new(|_, _| {}))?;
2417
2418 assert_eq!(property.observer_count(), 2);
2419
2420 // Next subscribe should fail
2421 assert!(property.subscribe(Arc::new(|_, _| {})).is_err());
2422
2423 // Unsubscribe one observer
2424 property.unsubscribe(id1)?;
2425 assert_eq!(property.observer_count(), 1);
2426
2427 // Now we should be able to add another observer
2428 let _id3 = property.subscribe(Arc::new(|_, _| {}))?;
2429 assert_eq!(property.observer_count(), 2);
2430
2431 // But not beyond the limit
2432 assert!(property.subscribe(Arc::new(|_, _| {})).is_err());
2433
2434 Ok(())
2435 }
2436
2437 #[tokio::test]
2438 async fn test_async_observer_capacity_limit() -> Result<(), PropertyError> {
2439 let config = PropertyConfig {
2440 max_observers: 2,
2441 max_pending_notifications: 100,
2442 observer_timeout_ms: 5000,
2443 max_concurrent_async_tasks: 100,
2444 };
2445
2446 let property = ObservableProperty::new_with_config(0, config);
2447
2448 // Add async observers up to limit
2449 property.subscribe_async(|_, _| async move {
2450 sleep(Duration::from_millis(10)).await;
2451 })?;
2452
2453 property.subscribe_async(|_, _| async move {
2454 sleep(Duration::from_millis(10)).await;
2455 })?;
2456
2457 assert_eq!(property.observer_count(), 2);
2458
2459 // Next subscribe should fail
2460 let result = property.subscribe_async(|_, _| async move {});
2461 assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2462
2463 Ok(())
2464 }
2465
2466 #[tokio::test]
2467 async fn test_filtered_observer_capacity_limit() -> Result<(), PropertyError> {
2468 let config = PropertyConfig {
2469 max_observers: 2,
2470 max_pending_notifications: 100,
2471 observer_timeout_ms: 5000,
2472 max_concurrent_async_tasks: 100,
2473 };
2474
2475 let property = ObservableProperty::new_with_config(0, config);
2476
2477 // Add filtered observers up to limit
2478 property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true)?;
2479 property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true)?;
2480
2481 assert_eq!(property.observer_count(), 2);
2482
2483 // Next subscribe should fail
2484 let result = property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true);
2485 assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2486
2487 Ok(())
2488 }
2489
2490 #[tokio::test]
2491 async fn test_capacity_error_diagnostic() {
2492 let error = PropertyError::CapacityExceeded {
2493 current: 100,
2494 max: 100,
2495 resource: "observers".to_string(),
2496 };
2497
2498 let diagnostic = error.diagnostic_info();
2499 assert!(diagnostic.contains("CAPACITY_EXCEEDED"));
2500 assert!(diagnostic.contains("resource=observers"));
2501 assert!(diagnostic.contains("current=100"));
2502 assert!(diagnostic.contains("max=100"));
2503 assert!(diagnostic.contains("utilization=100.0%"));
2504 }
2505
2506 #[tokio::test]
2507 async fn test_cloned_property_shares_config() -> Result<(), PropertyError> {
2508 let config = PropertyConfig {
2509 max_observers: 3,
2510 max_pending_notifications: 100,
2511 observer_timeout_ms: 5000,
2512 max_concurrent_async_tasks: 100,
2513 };
2514
2515 let property1 = ObservableProperty::new_with_config(0, config);
2516 let property2 = property1.clone();
2517
2518 // Add observers through both properties
2519 property1.subscribe(Arc::new(|_, _| {}))?;
2520 property2.subscribe(Arc::new(|_, _| {}))?;
2521 property1.subscribe(Arc::new(|_, _| {}))?;
2522
2523 // Both should show 3 observers since they share the same inner state
2524 assert_eq!(property1.observer_count(), 3);
2525 assert_eq!(property2.observer_count(), 3);
2526
2527 // Next subscribe should fail on either property
2528 assert!(property1.subscribe(Arc::new(|_, _| {})).is_err());
2529 assert!(property2.subscribe(Arc::new(|_, _| {})).is_err());
2530
2531 Ok(())
2532 }
2533
2534 #[tokio::test]
2535 async fn test_subscription_token_with_capacity() -> Result<(), PropertyError> {
2536 let config = PropertyConfig {
2537 max_observers: 2,
2538 max_pending_notifications: 100,
2539 observer_timeout_ms: 5000,
2540 max_concurrent_async_tasks: 100,
2541 };
2542
2543 let property = ObservableProperty::new_with_config(0, config);
2544
2545 // Create subscriptions with tokens
2546 let _sub1 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
2547 let _sub2 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
2548
2549 assert_eq!(property.observer_count(), 2);
2550
2551 // Next subscribe should fail
2552 let result = property.subscribe_with_token(Arc::new(|_, _| {}));
2553 assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2554
2555 // Drop one subscription
2556 drop(_sub1);
2557
2558 // Now we should be able to add another
2559 let _sub3 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
2560 assert_eq!(property.observer_count(), 2);
2561
2562 Ok(())
2563 }
2564
2565 // Test error diagnostic functionality
2566 #[tokio::test]
2567 async fn test_error_diagnostic_info() {
2568 // Test ReadLockError
2569 let read_error = PropertyError::read_lock_error("get_value", "acquiring read lock failed");
2570 let diagnostic = read_error.diagnostic_info();
2571 assert!(diagnostic.contains("READ_LOCK_ERROR"));
2572 assert!(diagnostic.contains("operation=get_value"));
2573 assert!(diagnostic.contains("context=acquiring read lock failed"));
2574 assert!(diagnostic.contains("timestamp_ms="));
2575
2576 // Test WriteLockError
2577 let write_error = PropertyError::write_lock_error("set_value", "acquiring write lock failed");
2578 let diagnostic = write_error.diagnostic_info();
2579 assert!(diagnostic.contains("WRITE_LOCK_ERROR"));
2580 assert!(diagnostic.contains("operation=set_value"));
2581
2582 // Test LockPoisoned
2583 let poisoned_error = PropertyError::lock_poisoned("notify", "inner lock poisoned");
2584 let diagnostic = poisoned_error.diagnostic_info();
2585 assert!(diagnostic.contains("LOCK_POISONED"));
2586 assert!(diagnostic.contains("operation=notify"));
2587 assert!(diagnostic.contains("context=inner lock poisoned"));
2588
2589 // Test ObserverPanic
2590 let panic_error = PropertyError::observer_panic(ObserverId(42), "observer crashed");
2591 let diagnostic = panic_error.diagnostic_info();
2592 assert!(diagnostic.contains("OBSERVER_PANIC"));
2593 assert!(diagnostic.contains("observer_id=42"));
2594 assert!(diagnostic.contains("error=observer crashed"));
2595
2596 // Test ObserverNotFound
2597 let not_found_error = PropertyError::ObserverNotFound { id: ObserverId(99) };
2598 let diagnostic = not_found_error.diagnostic_info();
2599 assert!(diagnostic.contains("OBSERVER_NOT_FOUND"));
2600 assert!(diagnostic.contains("id=99"));
2601
2602 // Test CapacityExceeded
2603 let capacity_error = PropertyError::CapacityExceeded {
2604 current: 150,
2605 max: 100,
2606 resource: "observers".to_string(),
2607 };
2608 let diagnostic = capacity_error.diagnostic_info();
2609 assert!(diagnostic.contains("CAPACITY_EXCEEDED"));
2610 assert!(diagnostic.contains("resource=observers"));
2611 assert!(diagnostic.contains("current=150"));
2612 assert!(diagnostic.contains("max=100"));
2613 assert!(diagnostic.contains("utilization=150.0%"));
2614
2615 // Test OperationTimeout
2616 let timeout_error = PropertyError::OperationTimeout {
2617 operation: "notify_all".to_string(),
2618 elapsed_ms: 5500,
2619 threshold_ms: 5000,
2620 };
2621 let diagnostic = timeout_error.diagnostic_info();
2622 assert!(diagnostic.contains("OPERATION_TIMEOUT"));
2623 assert!(diagnostic.contains("operation=notify_all"));
2624 assert!(diagnostic.contains("elapsed_ms=5500"));
2625 assert!(diagnostic.contains("threshold_ms=5000"));
2626 assert!(diagnostic.contains("overage_ms=500"));
2627
2628 // Test ShutdownInProgress
2629 let shutdown_error = PropertyError::ShutdownInProgress;
2630 let diagnostic = shutdown_error.diagnostic_info();
2631 assert!(diagnostic.contains("SHUTDOWN_IN_PROGRESS"));
2632
2633 // Test ObserverError
2634 let observer_error = PropertyError::ObserverError {
2635 reason: "callback failed".to_string(),
2636 };
2637 let diagnostic = observer_error.diagnostic_info();
2638 assert!(diagnostic.contains("OBSERVER_ERROR"));
2639 assert!(diagnostic.contains("reason=callback failed"));
2640
2641 // Test TokioError
2642 let tokio_error = PropertyError::TokioError {
2643 reason: "runtime unavailable".to_string(),
2644 };
2645 let diagnostic = tokio_error.diagnostic_info();
2646 assert!(diagnostic.contains("TOKIO_ERROR"));
2647 assert!(diagnostic.contains("reason=runtime unavailable"));
2648
2649 // Test JoinError
2650 let join_error = PropertyError::JoinError("task panicked".to_string());
2651 let diagnostic = join_error.diagnostic_info();
2652 assert!(diagnostic.contains("JOIN_ERROR"));
2653 assert!(diagnostic.contains("message=task panicked"));
2654 }
2655
2656 #[tokio::test]
2657 async fn test_error_helper_functions_with_timestamp() {
2658 use std::time::{SystemTime, UNIX_EPOCH};
2659
2660 let before = SystemTime::now()
2661 .duration_since(UNIX_EPOCH)
2662 .unwrap()
2663 .as_millis() as u64;
2664
2665 // Create errors using helper functions
2666 let read_error = PropertyError::read_lock_error("test_op", "test_context");
2667 let write_error = PropertyError::write_lock_error("test_op", "test_context");
2668 let poisoned_error = PropertyError::lock_poisoned("test_op", "test_context");
2669 let panic_error = PropertyError::observer_panic(ObserverId(1), "test_panic");
2670
2671 let after = SystemTime::now()
2672 .duration_since(UNIX_EPOCH)
2673 .unwrap()
2674 .as_millis() as u64;
2675
2676 // Verify timestamps are reasonable (within the time window of test execution)
2677 match read_error {
2678 PropertyError::ReadLockError { timestamp_ms, .. } => {
2679 assert!(timestamp_ms >= before && timestamp_ms <= after);
2680 }
2681 _ => panic!("Expected ReadLockError"),
2682 }
2683
2684 match write_error {
2685 PropertyError::WriteLockError { timestamp_ms, .. } => {
2686 assert!(timestamp_ms >= before && timestamp_ms <= after);
2687 }
2688 _ => panic!("Expected WriteLockError"),
2689 }
2690
2691 match poisoned_error {
2692 PropertyError::LockPoisoned { timestamp_ms, .. } => {
2693 assert!(timestamp_ms >= before && timestamp_ms <= after);
2694 }
2695 _ => panic!("Expected LockPoisoned"),
2696 }
2697
2698 match panic_error {
2699 PropertyError::ObserverPanic { timestamp_ms, .. } => {
2700 assert!(timestamp_ms >= before && timestamp_ms <= after);
2701 }
2702 _ => panic!("Expected ObserverPanic"),
2703 }
2704 }
2705
2706 #[tokio::test]
2707 async fn test_error_display_formatting() {
2708 let timeout_error = PropertyError::OperationTimeout {
2709 operation: "test_operation".to_string(),
2710 elapsed_ms: 1500,
2711 threshold_ms: 1000,
2712 };
2713 let display = format!("{}", timeout_error);
2714 assert!(display.contains("test_operation"));
2715 assert!(display.contains("1500ms"));
2716 assert!(display.contains("1000ms"));
2717
2718 let capacity_error = PropertyError::CapacityExceeded {
2719 current: 200,
2720 max: 100,
2721 resource: "test_resource".to_string(),
2722 };
2723 let display = format!("{}", capacity_error);
2724 assert!(display.contains("200"));
2725 assert!(display.contains("100"));
2726 assert!(display.contains("test_resource"));
2727 }
2728
2729 #[tokio::test]
2730 async fn test_capacity_exceeded_utilization_calculation() {
2731 let error = PropertyError::CapacityExceeded {
2732 current: 75,
2733 max: 100,
2734 resource: "observers".to_string(),
2735 };
2736 let diagnostic = error.diagnostic_info();
2737 assert!(diagnostic.contains("utilization=75.0%"));
2738
2739 let error2 = PropertyError::CapacityExceeded {
2740 current: 100,
2741 max: 100,
2742 resource: "observers".to_string(),
2743 };
2744 let diagnostic2 = error2.diagnostic_info();
2745 assert!(diagnostic2.contains("utilization=100.0%"));
2746 }
2747
2748 // Test graceful shutdown functionality
2749 #[tokio::test]
2750 async fn test_shutdown_with_timeout_basic() -> Result<(), PropertyError> {
2751 use std::time::Duration;
2752
2753 let property = ObservableProperty::new(0);
2754 let counter = Arc::new(AtomicUsize::new(0));
2755
2756 // Add some observers
2757 let counter1 = counter.clone();
2758 property.subscribe(Arc::new(move |_, _| {
2759 counter1.fetch_add(1, Ordering::SeqCst);
2760 }))?;
2761
2762 let counter2 = counter.clone();
2763 property.subscribe_async(move |_, _| {
2764 let counter = counter2.clone();
2765 async move {
2766 counter.fetch_add(1, Ordering::SeqCst);
2767 }
2768 })?;
2769
2770 assert_eq!(property.observer_count(), 2);
2771
2772 // Perform shutdown with timeout
2773 let report = property.shutdown_with_timeout(Duration::from_secs(5)).await?;
2774
2775 // Verify report
2776 assert_eq!(report.observers_cleared, 2);
2777 assert!(report.shutdown_duration.as_secs() < 5);
2778 assert!(report.completed_within_timeout);
2779 assert!(report.initiated_at_ms > 0);
2780
2781 // Verify observers were cleared
2782 assert_eq!(property.observer_count(), 0);
2783
2784 // Setting value should not trigger observers
2785 property.set(42)?;
2786 assert_eq!(counter.load(Ordering::SeqCst), 0);
2787
2788 Ok(())
2789 }
2790
2791 #[tokio::test]
2792 async fn test_shutdown_report_diagnostic() -> Result<(), PropertyError> {
2793 use std::time::Duration;
2794
2795 let property = ObservableProperty::new(100);
2796
2797 // Add multiple observers
2798 for _ in 0..5 {
2799 property.subscribe(Arc::new(|_, _| {}))?;
2800 }
2801
2802 let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2803
2804 let diagnostic = report.diagnostic_info();
2805 assert!(diagnostic.contains("SHUTDOWN_COMPLETE"));
2806 assert!(diagnostic.contains("observers_cleared=5"));
2807 assert!(diagnostic.contains("within_timeout=true"));
2808 assert!(diagnostic.contains("initiated_at_ms="));
2809
2810 Ok(())
2811 }
2812
2813 #[tokio::test]
2814 async fn test_shutdown_with_async_observers() -> Result<(), PropertyError> {
2815 use std::time::Duration;
2816
2817 let property = ObservableProperty::new(0);
2818 let counter = Arc::new(AtomicUsize::new(0));
2819
2820 // Add async observers that take some time
2821 let counter1 = counter.clone();
2822 property.subscribe_async(move |_, _| {
2823 let counter = counter1.clone();
2824 async move {
2825 tokio::time::sleep(Duration::from_millis(50)).await;
2826 counter.fetch_add(1, Ordering::SeqCst);
2827 }
2828 })?;
2829
2830 let counter2 = counter.clone();
2831 property.subscribe_async(move |_, _| {
2832 let counter = counter2.clone();
2833 async move {
2834 tokio::time::sleep(Duration::from_millis(50)).await;
2835 counter.fetch_add(1, Ordering::SeqCst);
2836 }
2837 })?;
2838
2839 // Trigger observers
2840 property.set_async(42).await?;
2841
2842 // Give time for async operations to start
2843 tokio::time::sleep(Duration::from_millis(10)).await;
2844
2845 // Shutdown with enough timeout for operations to complete
2846 let report = property.shutdown_with_timeout(Duration::from_secs(2)).await?;
2847
2848 assert_eq!(report.observers_cleared, 2);
2849 assert!(report.completed_within_timeout);
2850
2851 Ok(())
2852 }
2853
2854 #[tokio::test]
2855 async fn test_shutdown_idempotent() -> Result<(), PropertyError> {
2856 use std::time::Duration;
2857
2858 let property = ObservableProperty::new("test");
2859
2860 property.subscribe(Arc::new(|_, _| {}))?;
2861 property.subscribe(Arc::new(|_, _| {}))?;
2862
2863 // First shutdown
2864 let report1 = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2865 assert_eq!(report1.observers_cleared, 2);
2866
2867 // Second shutdown should still succeed but clear 0 observers
2868 let report2 = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2869 assert_eq!(report2.observers_cleared, 0);
2870
2871 Ok(())
2872 }
2873
2874 #[tokio::test]
2875 async fn test_shutdown_vs_shutdown_with_timeout() -> Result<(), PropertyError> {
2876 use std::time::Duration;
2877
2878 // Test regular shutdown
2879 let property1 = ObservableProperty::new(0);
2880 property1.subscribe(Arc::new(|_, _| {}))?;
2881 property1.subscribe(Arc::new(|_, _| {}))?;
2882
2883 property1.shutdown()?;
2884 assert_eq!(property1.observer_count(), 0);
2885
2886 // Test shutdown with timeout
2887 let property2 = ObservableProperty::new(0);
2888 property2.subscribe(Arc::new(|_, _| {}))?;
2889 property2.subscribe(Arc::new(|_, _| {}))?;
2890
2891 let report = property2.shutdown_with_timeout(Duration::from_secs(1)).await?;
2892 assert_eq!(property2.observer_count(), 0);
2893 assert_eq!(report.observers_cleared, 2);
2894
2895 Ok(())
2896 }
2897
2898 #[tokio::test]
2899 async fn test_shutdown_report_timing() -> Result<(), PropertyError> {
2900 use std::time::{Duration, Instant};
2901
2902 let property = ObservableProperty::new(0);
2903
2904 // Add several observers
2905 for _ in 0..10 {
2906 property.subscribe(Arc::new(|_, _| {}))?;
2907 }
2908
2909 let start = Instant::now();
2910 let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2911 let elapsed = start.elapsed();
2912
2913 // Shutdown should complete reasonably quickly
2914 assert!(elapsed < Duration::from_secs(2));
2915 assert!(report.shutdown_duration <= elapsed);
2916 assert_eq!(report.observers_cleared, 10);
2917
2918 Ok(())
2919 }
2920
2921 #[tokio::test]
2922 async fn test_shutdown_with_filtered_and_async_observers() -> Result<(), PropertyError> {
2923 use std::time::Duration;
2924
2925 let property = ObservableProperty::new(0);
2926
2927 // Mix of different observer types
2928 property.subscribe(Arc::new(|_, _| {}))?;
2929 property.subscribe_async(|_, _| async {})?;
2930 property.subscribe_filtered(Arc::new(|_, _| {}), |_, new| new % 2 == 0)?;
2931 property.subscribe_async_filtered(|_, _| async {}, |_, new| new > &0)?;
2932
2933 assert_eq!(property.observer_count(), 4);
2934
2935 let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2936
2937 assert_eq!(report.observers_cleared, 4);
2938 assert_eq!(property.observer_count(), 0);
2939
2940 Ok(())
2941 }
2942
2943 // Batching tests
2944 #[tokio::test]
2945 async fn test_batched_property_creation() -> Result<(), PropertyError> {
2946 let property = BatchedProperty::new(42);
2947 assert_eq!(property.get()?, 42);
2948
2949 let config = BatchConfig {
2950 batch_interval: std::time::Duration::from_millis(50),
2951 };
2952 let property2 = BatchedProperty::new_with_config(100, config);
2953 assert_eq!(property2.get()?, 100);
2954
2955 Ok(())
2956 }
2957
2958 #[tokio::test]
2959 async fn test_batched_property_queue_update() -> Result<(), PropertyError> {
2960 use std::time::Duration;
2961
2962 let property = BatchedProperty::new(0);
2963 let counter = Arc::new(AtomicUsize::new(0));
2964 let last_value = Arc::new(RwLock::new(0));
2965
2966 // Subscribe to updates
2967 property.subscribe(Arc::new({
2968 let counter = counter.clone();
2969 let last_value = last_value.clone();
2970 move |_, new| {
2971 counter.fetch_add(1, Ordering::SeqCst);
2972 *last_value.write() = *new;
2973 }
2974 }))?;
2975
2976 // Queue multiple updates rapidly
2977 for i in 1..=10 {
2978 property.queue_update(i)?;
2979 }
2980
2981 // Wait for batch to flush (default is 100ms)
2982 tokio::time::sleep(Duration::from_millis(150)).await;
2983
2984 // Should have been notified only once with the last value
2985 assert_eq!(counter.load(Ordering::SeqCst), 1);
2986 assert_eq!(*last_value.read(), 10);
2987 assert_eq!(property.get()?, 10);
2988
2989 Ok(())
2990 }
2991
2992 #[tokio::test]
2993 async fn test_batched_property_multiple_batches() -> Result<(), PropertyError> {
2994 use std::time::Duration;
2995
2996 let config = BatchConfig {
2997 batch_interval: Duration::from_millis(50),
2998 };
2999 let property = BatchedProperty::new_with_config(0, config);
3000 let counter = Arc::new(AtomicUsize::new(0));
3001
3002 property.subscribe(Arc::new({
3003 let counter = counter.clone();
3004 move |_, _| {
3005 counter.fetch_add(1, Ordering::SeqCst);
3006 }
3007 }))?;
3008
3009 // First batch
3010 for i in 1..=5 {
3011 property.queue_update(i)?;
3012 }
3013 tokio::time::sleep(Duration::from_millis(75)).await;
3014
3015 // Second batch
3016 for i in 6..=10 {
3017 property.queue_update(i)?;
3018 }
3019 tokio::time::sleep(Duration::from_millis(75)).await;
3020
3021 // Should have been notified twice (once per batch)
3022 assert_eq!(counter.load(Ordering::SeqCst), 2);
3023 assert_eq!(property.get()?, 10);
3024
3025 Ok(())
3026 }
3027
3028 #[tokio::test]
3029 async fn test_batched_property_set_immediate() -> Result<(), PropertyError> {
3030 use std::time::Duration;
3031
3032 let property = BatchedProperty::new(0);
3033 let counter = Arc::new(AtomicUsize::new(0));
3034
3035 property.subscribe(Arc::new({
3036 let counter = counter.clone();
3037 move |_, _| {
3038 counter.fetch_add(1, Ordering::SeqCst);
3039 }
3040 }))?;
3041
3042 // Queue some updates
3043 property.queue_update(5)?;
3044 property.queue_update(10)?;
3045
3046 // Set immediately - should notify right away
3047 property.set_immediate(42)?;
3048 assert_eq!(counter.load(Ordering::SeqCst), 1);
3049 assert_eq!(property.get()?, 42);
3050
3051 // Wait for batch - queued updates should be cleared by set_immediate
3052 tokio::time::sleep(Duration::from_millis(150)).await;
3053
3054 // Counter should still be 1 (no additional notification from batch)
3055 // Note: This behavior depends on timing, but set_immediate should have priority
3056 assert!(counter.load(Ordering::SeqCst) >= 1);
3057
3058 Ok(())
3059 }
3060
3061 #[tokio::test]
3062 async fn test_batched_property_flush() -> Result<(), PropertyError> {
3063 let property = BatchedProperty::new(0);
3064 let counter = Arc::new(AtomicUsize::new(0));
3065
3066 property.subscribe(Arc::new({
3067 let counter = counter.clone();
3068 move |_, _| {
3069 counter.fetch_add(1, Ordering::SeqCst);
3070 }
3071 }))?;
3072
3073 // Queue updates
3074 property.queue_update(42)?;
3075
3076 // Flush immediately
3077 property.flush().await?;
3078
3079 // Should be notified immediately
3080 assert_eq!(counter.load(Ordering::SeqCst), 1);
3081 assert_eq!(property.get()?, 42);
3082
3083 Ok(())
3084 }
3085
3086 #[tokio::test]
3087 async fn test_batched_property_async_observer() -> Result<(), PropertyError> {
3088 use std::time::Duration;
3089
3090 let property = BatchedProperty::new(0);
3091 let counter = Arc::new(AtomicUsize::new(0));
3092
3093 property.subscribe_async({
3094 let counter = counter.clone();
3095 move |_, _| {
3096 let counter = counter.clone();
3097 async move {
3098 tokio::time::sleep(Duration::from_millis(10)).await;
3099 counter.fetch_add(1, Ordering::SeqCst);
3100 }
3101 }
3102 })?;
3103
3104 // Queue multiple updates
3105 for i in 1..=5 {
3106 property.queue_update(i)?;
3107 }
3108
3109 // Wait for batch and async processing
3110 tokio::time::sleep(Duration::from_millis(200)).await;
3111
3112 // Should have been notified once
3113 assert_eq!(counter.load(Ordering::SeqCst), 1);
3114
3115 Ok(())
3116 }
3117
3118 #[tokio::test]
3119 async fn test_batched_property_observer_management() -> Result<(), PropertyError> {
3120 let property = BatchedProperty::new(0);
3121
3122 // Add observers
3123 let id1 = property.subscribe(Arc::new(|_, _| {}))?;
3124 let _id2 = property.subscribe(Arc::new(|_, _| {}))?;
3125 assert_eq!(property.observer_count(), 2);
3126
3127 // Unsubscribe one
3128 property.unsubscribe(id1)?;
3129 assert_eq!(property.observer_count(), 1);
3130
3131 // Clear all
3132 property.clear_observers()?;
3133 assert_eq!(property.observer_count(), 0);
3134
3135 Ok(())
3136 }
3137
3138 #[tokio::test]
3139 async fn test_batched_property_clone() -> Result<(), PropertyError> {
3140 use std::time::Duration;
3141
3142 let property1 = BatchedProperty::new(0);
3143 let property2 = property1.clone();
3144
3145 let counter = Arc::new(AtomicUsize::new(0));
3146
3147 // Subscribe on clone
3148 property2.subscribe(Arc::new({
3149 let counter = counter.clone();
3150 move |_, _| {
3151 counter.fetch_add(1, Ordering::SeqCst);
3152 }
3153 }))?;
3154
3155 // Update on original
3156 property1.queue_update(42)?;
3157
3158 // Wait for batch
3159 tokio::time::sleep(Duration::from_millis(150)).await;
3160
3161 // Observer should be notified
3162 assert_eq!(counter.load(Ordering::SeqCst), 1);
3163 assert_eq!(property1.get()?, 42);
3164 assert_eq!(property2.get()?, 42);
3165
3166 Ok(())
3167 }
3168
3169 #[tokio::test]
3170 async fn test_batched_property_high_frequency() -> Result<(), PropertyError> {
3171 use std::time::Duration;
3172
3173 let config = BatchConfig {
3174 batch_interval: Duration::from_millis(100),
3175 };
3176 let property = BatchedProperty::new_with_config(0, config);
3177 let counter = Arc::new(AtomicUsize::new(0));
3178
3179 property.subscribe(Arc::new({
3180 let counter = counter.clone();
3181 move |_, _| {
3182 counter.fetch_add(1, Ordering::SeqCst);
3183 }
3184 }))?;
3185
3186 // Queue 1000 updates rapidly
3187 for i in 1..=1000 {
3188 property.queue_update(i)?;
3189 }
3190
3191 // Wait for batch
3192 tokio::time::sleep(Duration::from_millis(150)).await;
3193
3194 // Should have been notified only once despite 1000 updates
3195 assert_eq!(counter.load(Ordering::SeqCst), 1);
3196 assert_eq!(property.get()?, 1000);
3197
3198 Ok(())
3199 }
3200}
3201
3202/// Configuration for batched property updates
3203///
3204/// Controls how frequently batched updates are flushed to observers,
3205/// helping reduce overhead for high-frequency property changes.
3206///
3207/// # Examples
3208///
3209/// ```
3210/// use observable_property_tokio::BatchConfig;
3211/// use std::time::Duration;
3212///
3213/// let config = BatchConfig {
3214/// batch_interval: Duration::from_millis(100),
3215/// };
3216/// ```
3217#[derive(Debug, Clone)]
3218pub struct BatchConfig {
3219 /// How often to flush batched updates to observers
3220 ///
3221 /// A longer interval reduces notification overhead but increases latency.
3222 /// A shorter interval provides faster updates but with more overhead.
3223 ///
3224 /// Default: 100ms
3225 pub batch_interval: std::time::Duration,
3226}
3227
3228impl Default for BatchConfig {
3229 fn default() -> Self {
3230 Self {
3231 batch_interval: std::time::Duration::from_millis(100),
3232 }
3233 }
3234}
3235
3236/// A batched wrapper around ObservableProperty that reduces notification overhead
3237///
3238/// This type collects property updates and only notifies observers at regular intervals,
3239/// which is useful for high-frequency update scenarios where you want to reduce the
3240/// number of observer notifications.
3241///
3242/// # Examples
3243///
3244/// ```
3245/// use observable_property_tokio::{BatchedProperty, BatchConfig};
3246/// use std::time::Duration;
3247/// use std::sync::Arc;
3248///
3249/// #[tokio::main]
3250/// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3251/// // Create a batched property with 100ms batching interval
3252/// let config = BatchConfig {
3253/// batch_interval: Duration::from_millis(100),
3254/// };
3255///
3256/// let property = BatchedProperty::new_with_config(0, config);
3257///
3258/// // Subscribe to batched updates
3259/// property.subscribe(Arc::new(|old, new| {
3260/// println!("Batched update: {} -> {}", old, new);
3261/// }))?;
3262///
3263/// // Queue multiple updates rapidly
3264/// for i in 1..=100 {
3265/// property.queue_update(i)?;
3266/// }
3267///
3268/// // Observers will only be notified once per batch interval
3269/// // with the latest value
3270///
3271/// // Wait for batch to flush
3272/// tokio::time::sleep(Duration::from_millis(150)).await;
3273///
3274/// Ok(())
3275/// }
3276/// ```
3277pub struct BatchedProperty<T: Clone + Send + Sync + 'static> {
3278 inner: ObservableProperty<T>,
3279 pending_update: Arc<RwLock<Option<T>>>,
3280 _batch_task: Arc<tokio::task::JoinHandle<()>>,
3281}
3282
3283impl<T: Clone + Send + Sync + 'static> BatchedProperty<T> {
3284 /// Create a new batched property with default configuration
3285 ///
3286 /// Uses a batch interval of 100ms by default.
3287 ///
3288 /// # Arguments
3289 ///
3290 /// * `initial_value` - The starting value for the property
3291 ///
3292 /// # Examples
3293 ///
3294 /// ```
3295 /// use observable_property_tokio::BatchedProperty;
3296 ///
3297 /// #[tokio::main]
3298 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3299 /// let property = BatchedProperty::new(42);
3300 /// assert_eq!(property.get()?, 42);
3301 /// Ok(())
3302 /// }
3303 /// ```
3304 pub fn new(initial_value: T) -> Self {
3305 Self::new_with_config(initial_value, BatchConfig::default())
3306 }
3307
3308 /// Create a new batched property with custom configuration
3309 ///
3310 /// # Arguments
3311 ///
3312 /// * `initial_value` - The starting value for the property
3313 /// * `config` - Batch configuration controlling flush interval
3314 ///
3315 /// # Examples
3316 ///
3317 /// ```
3318 /// use observable_property_tokio::{BatchedProperty, BatchConfig};
3319 /// use std::time::Duration;
3320 ///
3321 /// #[tokio::main]
3322 /// async fn main() {
3323 /// let config = BatchConfig {
3324 /// batch_interval: Duration::from_millis(50),
3325 /// };
3326 ///
3327 /// let property = BatchedProperty::new_with_config(0, config);
3328 /// }
3329 /// ```
3330 pub fn new_with_config(initial_value: T, config: BatchConfig) -> Self {
3331 let inner = ObservableProperty::new(initial_value);
3332 let pending_update = Arc::new(RwLock::new(None));
3333
3334 // Spawn batch processor task
3335 let inner_clone = inner.clone();
3336 let pending_clone = pending_update.clone();
3337 let batch_interval = config.batch_interval;
3338
3339 let batch_task = tokio::spawn(async move {
3340 let mut interval = tokio::time::interval(batch_interval);
3341 loop {
3342 interval.tick().await;
3343
3344 // Check if there's a pending update
3345 let update = {
3346 let mut pending = pending_clone.write();
3347 pending.take()
3348 };
3349
3350 // If there's an update, apply it
3351 if let Some(value) = update {
3352 let _ = inner_clone.set_async(value).await;
3353 }
3354 }
3355 });
3356
3357 Self {
3358 inner,
3359 pending_update,
3360 _batch_task: Arc::new(batch_task),
3361 }
3362 }
3363
3364 /// Queue an update to be batched
3365 ///
3366 /// The update will be held until the next batch interval, at which point
3367 /// only the most recent queued value will be applied and observers notified.
3368 ///
3369 /// # Arguments
3370 ///
3371 /// * `value` - The new value to queue
3372 ///
3373 /// # Returns
3374 ///
3375 /// `Ok(())` if the update was queued successfully
3376 ///
3377 /// # Examples
3378 ///
3379 /// ```
3380 /// use observable_property_tokio::BatchedProperty;
3381 /// use std::time::Duration;
3382 ///
3383 /// #[tokio::main]
3384 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3385 /// let property = BatchedProperty::new(0);
3386 ///
3387 /// // Queue multiple updates
3388 /// for i in 1..=10 {
3389 /// property.queue_update(i)?;
3390 /// }
3391 ///
3392 /// // Wait for batch to flush
3393 /// tokio::time::sleep(Duration::from_millis(150)).await;
3394 ///
3395 /// // Property will have the last queued value
3396 /// assert_eq!(property.get()?, 10);
3397 ///
3398 /// Ok(())
3399 /// }
3400 /// ```
3401 pub fn queue_update(&self, value: T) -> Result<(), PropertyError> {
3402 *self.pending_update.write() = Some(value);
3403 Ok(())
3404 }
3405
3406 /// Set the value immediately, bypassing batching
3407 ///
3408 /// This will apply the update immediately and notify observers synchronously,
3409 /// without waiting for the next batch interval.
3410 ///
3411 /// # Arguments
3412 ///
3413 /// * `value` - The new value to set
3414 ///
3415 /// # Examples
3416 ///
3417 /// ```
3418 /// use observable_property_tokio::BatchedProperty;
3419 ///
3420 /// # #[tokio::main]
3421 /// # async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3422 /// let property = BatchedProperty::new(0);
3423 /// property.set_immediate(42)?;
3424 /// assert_eq!(property.get()?, 42);
3425 /// # Ok(())
3426 /// # }
3427 /// ```
3428 pub fn set_immediate(&self, value: T) -> Result<(), PropertyError> {
3429 self.inner.set(value)
3430 }
3431
3432 /// Set the value immediately using async notification
3433 ///
3434 /// This will apply the update immediately and notify observers asynchronously,
3435 /// without waiting for the next batch interval.
3436 ///
3437 /// # Arguments
3438 ///
3439 /// * `value` - The new value to set
3440 pub async fn set_immediate_async(&self, value: T) -> Result<(), PropertyError> {
3441 self.inner.set_async(value).await
3442 }
3443
3444 /// Get the current value
3445 ///
3446 /// # Examples
3447 ///
3448 /// ```
3449 /// use observable_property_tokio::BatchedProperty;
3450 ///
3451 /// #[tokio::main]
3452 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3453 /// let property = BatchedProperty::new(42);
3454 /// assert_eq!(property.get()?, 42);
3455 /// Ok(())
3456 /// }
3457 /// ```
3458 pub fn get(&self) -> Result<T, PropertyError> {
3459 self.inner.get()
3460 }
3461
3462 /// Subscribe to batched property changes
3463 ///
3464 /// Observers will be notified at batch intervals with the latest value.
3465 ///
3466 /// # Arguments
3467 ///
3468 /// * `observer` - Callback function to handle property changes
3469 ///
3470 /// # Returns
3471 ///
3472 /// `Ok(ObserverId)` containing a unique identifier for this observer
3473 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
3474 self.inner.subscribe(observer)
3475 }
3476
3477 /// Subscribe with an async handler
3478 pub fn subscribe_async<F, Fut>(&self, handler: F) -> Result<ObserverId, PropertyError>
3479 where
3480 F: Fn(T, T) -> Fut + Send + Sync + 'static,
3481 Fut: std::future::Future<Output = ()> + Send + 'static,
3482 {
3483 self.inner.subscribe_async(handler)
3484 }
3485
3486 /// Unsubscribe an observer
3487 pub fn unsubscribe(&self, id: ObserverId) -> Result<(), PropertyError> {
3488 self.inner.unsubscribe(id)
3489 }
3490
3491 /// Get the number of registered observers
3492 pub fn observer_count(&self) -> usize {
3493 self.inner.observer_count()
3494 }
3495
3496 /// Clear all observers
3497 pub fn clear_observers(&self) -> Result<(), PropertyError> {
3498 self.inner.clear_observers()
3499 }
3500
3501 /// Flush any pending batched update immediately
3502 ///
3503 /// This forces any queued update to be applied right away,
3504 /// rather than waiting for the next batch interval.
3505 ///
3506 /// # Examples
3507 ///
3508 /// ```
3509 /// use observable_property_tokio::BatchedProperty;
3510 ///
3511 /// # #[tokio::main]
3512 /// # async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3513 /// let property = BatchedProperty::new(0);
3514 /// property.queue_update(42)?;
3515 /// property.flush().await?;
3516 /// assert_eq!(property.get()?, 42);
3517 /// # Ok(())
3518 /// # }
3519 /// ```
3520 pub async fn flush(&self) -> Result<(), PropertyError> {
3521 let update = {
3522 let mut pending = self.pending_update.write();
3523 pending.take()
3524 };
3525
3526 if let Some(value) = update {
3527 self.inner.set_async(value).await?;
3528 }
3529
3530 Ok(())
3531 }
3532}
3533
3534impl<T: Clone + Send + Sync + 'static> Clone for BatchedProperty<T> {
3535 fn clone(&self) -> Self {
3536 Self {
3537 inner: self.inner.clone(),
3538 pending_update: Arc::clone(&self.pending_update),
3539 _batch_task: Arc::clone(&self._batch_task),
3540 }
3541 }
3542}
3543
3544impl From<JoinError> for PropertyError {
3545 /// Convert a Tokio JoinError into a PropertyError
3546 ///
3547 /// This enables using the `?` operator directly on `task::spawn(...).await`
3548 /// which returns a `Result<T, JoinError>`.
3549 ///
3550 /// # Examples
3551 ///
3552 /// ```rust
3553 /// use observable_property_tokio::ObservableProperty;
3554 /// use std::sync::Arc;
3555 /// use tokio::task;
3556 ///
3557 /// #[tokio::main]
3558 /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3559 /// let property = Arc::new(ObservableProperty::new(0));
3560 /// let property_clone = property.clone();
3561 ///
3562 /// // This task::spawn can now use ?? to propagate both types of errors
3563 /// task::spawn(async move {
3564 /// property_clone.set(42)?;
3565 /// Ok::<_, observable_property_tokio::PropertyError>(())
3566 /// }).await??;
3567 ///
3568 /// Ok(())
3569 /// }
3570 /// ```
3571 fn from(err: JoinError) -> Self {
3572 PropertyError::JoinError(err.to_string())
3573 }
3574}
3575
3576#[cfg(test)]
3577mod connection_pool_tests {
3578 use super::*;
3579 use std::sync::atomic::{AtomicUsize, Ordering};
3580 use tokio::time::{sleep, Duration};
3581
3582 #[tokio::test]
3583 async fn test_concurrent_task_limiting() -> Result<(), PropertyError> {
3584 let config = PropertyConfig {
3585 max_observers: 1000,
3586 max_pending_notifications: 1000,
3587 observer_timeout_ms: 5000,
3588 max_concurrent_async_tasks: 5, // Only 5 concurrent tasks allowed
3589 };
3590
3591 let property = ObservableProperty::new_with_config(0, config);
3592 let concurrent_count = Arc::new(AtomicUsize::new(0));
3593 let max_concurrent = Arc::new(AtomicUsize::new(0));
3594
3595 // Subscribe 20 async observers that all take 100ms to execute
3596 for _ in 0..20 {
3597 let counter = Arc::clone(&concurrent_count);
3598 let max_counter = Arc::clone(&max_concurrent);
3599
3600 property.subscribe_async(move |_, _| {
3601 let counter = Arc::clone(&counter);
3602 let max_counter = Arc::clone(&max_counter);
3603
3604 async move {
3605 // Increment concurrent count
3606 let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
3607
3608 // Update max if needed
3609 max_counter.fetch_max(current, Ordering::SeqCst);
3610
3611 // Simulate work
3612 sleep(Duration::from_millis(100)).await;
3613
3614 // Decrement concurrent count
3615 counter.fetch_sub(1, Ordering::SeqCst);
3616 }
3617 })?;
3618 }
3619
3620 // Trigger notification to all observers
3621 property.set_async(42).await?;
3622
3623 // Wait for all tasks to complete
3624 sleep(Duration::from_millis(500)).await;
3625
3626 // Verify max concurrent was not exceeded
3627 let max_reached = max_concurrent.load(Ordering::SeqCst);
3628 println!(
3629 "Max concurrent tasks: {} (limit: 5)",
3630 max_reached
3631 );
3632
3633 assert!(
3634 max_reached <= 5,
3635 "Expected max concurrent tasks <= 5, but got {}",
3636 max_reached
3637 );
3638
3639 Ok(())
3640 }
3641
3642 #[tokio::test]
3643 async fn test_semaphore_blocks_when_max_reached() -> Result<(), PropertyError> {
3644 let config = PropertyConfig {
3645 max_observers: 100,
3646 max_pending_notifications: 100,
3647 observer_timeout_ms: 5000,
3648 max_concurrent_async_tasks: 2, // Very low limit
3649 };
3650
3651 let property = ObservableProperty::new_with_config(0, config);
3652 let execution_order = Arc::new(parking_lot::RwLock::new(Vec::new()));
3653
3654 // Add 5 observers with delays to test blocking
3655 for i in 0..5 {
3656 let order = Arc::clone(&execution_order);
3657 property.subscribe_async(move |_, _| {
3658 let order = Arc::clone(&order);
3659 async move {
3660 order.write().push((i, "start"));
3661 sleep(Duration::from_millis(50)).await;
3662 order.write().push((i, "end"));
3663 }
3664 })?;
3665 }
3666
3667 // Trigger notification
3668 property.set_async(100).await?;
3669
3670 // Wait for all to complete
3671 sleep(Duration::from_millis(300)).await;
3672
3673 let order = execution_order.read();
3674 println!("Execution order: {:?}", *order);
3675
3676 // Verify that we have start/end pairs
3677 assert_eq!(order.len(), 10, "Should have 5 start and 5 end events");
3678
3679 // Verify all tasks completed
3680 let starts = order.iter().filter(|(_, phase)| *phase == "start").count();
3681 let ends = order.iter().filter(|(_, phase)| *phase == "end").count();
3682 assert_eq!(starts, 5, "Should have 5 starts");
3683 assert_eq!(ends, 5, "Should have 5 ends");
3684
3685 Ok(())
3686 }
3687
3688 #[tokio::test]
3689 async fn test_permits_released_after_execution() -> Result<(), PropertyError> {
3690 let config = PropertyConfig {
3691 max_observers: 100,
3692 max_pending_notifications: 100,
3693 observer_timeout_ms: 5000,
3694 max_concurrent_async_tasks: 3,
3695 };
3696
3697 let property = ObservableProperty::new_with_config(0, config);
3698 let executions = Arc::new(AtomicUsize::new(0));
3699
3700 // Add 10 observers
3701 for _ in 0..10 {
3702 let counter = Arc::clone(&executions);
3703 property.subscribe_async(move |_, _| {
3704 let counter = Arc::clone(&counter);
3705 async move {
3706 counter.fetch_add(1, Ordering::SeqCst);
3707 sleep(Duration::from_millis(10)).await;
3708 }
3709 })?;
3710 }
3711
3712 // Multiple notifications to test permit reuse
3713 for _ in 0..3 {
3714 property.set_async(42).await?;
3715 sleep(Duration::from_millis(100)).await;
3716 }
3717
3718 // Verify all executions happened (10 observers * 3 notifications = 30)
3719 let total = executions.load(Ordering::SeqCst);
3720 assert_eq!(total, 30, "Expected 30 executions, got {}", total);
3721
3722 Ok(())
3723 }
3724
3725 #[tokio::test]
3726 async fn test_filtered_async_observers_respect_limit() -> Result<(), PropertyError> {
3727 let config = PropertyConfig {
3728 max_observers: 100,
3729 max_pending_notifications: 100,
3730 observer_timeout_ms: 5000,
3731 max_concurrent_async_tasks: 3,
3732 };
3733
3734 let property = ObservableProperty::new_with_config(0, config);
3735 let concurrent_count = Arc::new(AtomicUsize::new(0));
3736 let max_concurrent = Arc::new(AtomicUsize::new(0));
3737
3738 // Add 10 filtered async observers
3739 for _ in 0..10 {
3740 let counter = Arc::clone(&concurrent_count);
3741 let max_counter = Arc::clone(&max_concurrent);
3742
3743 property.subscribe_async_filtered(
3744 move |_, _| {
3745 let counter = Arc::clone(&counter);
3746 let max_counter = Arc::clone(&max_counter);
3747
3748 async move {
3749 let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
3750 max_counter.fetch_max(current, Ordering::SeqCst);
3751 sleep(Duration::from_millis(50)).await;
3752 counter.fetch_sub(1, Ordering::SeqCst);
3753 }
3754 },
3755 |_, &new| new % 2 == 0, // Only trigger on even values
3756 )?;
3757 }
3758
3759 // Trigger with even value
3760 property.set_async(100).await?;
3761 sleep(Duration::from_millis(200)).await;
3762
3763 let max_reached = max_concurrent.load(Ordering::SeqCst);
3764 println!("Max concurrent filtered async tasks: {}", max_reached);
3765
3766 assert!(
3767 max_reached <= 3,
3768 "Expected max concurrent tasks <= 3, got {}",
3769 max_reached
3770 );
3771
3772 Ok(())
3773 }
3774
3775 #[tokio::test]
3776 async fn test_default_concurrent_limit() -> Result<(), PropertyError> {
3777 // Default config should have max_concurrent_async_tasks = 100
3778 let property = ObservableProperty::new(0);
3779
3780 // Add 200 async observers
3781 for _ in 0..200 {
3782 property.subscribe_async(|_, _| async move {
3783 sleep(Duration::from_millis(50)).await;
3784 })?;
3785 }
3786
3787 // This should work without blocking indefinitely
3788 property.set_async(42).await?;
3789 sleep(Duration::from_millis(300)).await;
3790
3791 Ok(())
3792 }
3793
3794 #[tokio::test]
3795 async fn test_mixed_sync_and_async_observers() -> Result<(), PropertyError> {
3796 let config = PropertyConfig {
3797 max_observers: 100,
3798 max_pending_notifications: 100,
3799 observer_timeout_ms: 5000,
3800 max_concurrent_async_tasks: 2,
3801 };
3802
3803 let property = ObservableProperty::new_with_config(0, config);
3804 let sync_count = Arc::new(AtomicUsize::new(0));
3805 let async_count = Arc::new(AtomicUsize::new(0));
3806
3807 // Add sync observers (these are not limited by semaphore)
3808 for _ in 0..5 {
3809 let counter = Arc::clone(&sync_count);
3810 property.subscribe(Arc::new(move |_, _| {
3811 counter.fetch_add(1, Ordering::SeqCst);
3812 }))?;
3813 }
3814
3815 // Add async observers (these ARE limited by semaphore)
3816 for _ in 0..5 {
3817 let counter = Arc::clone(&async_count);
3818 property.subscribe_async(move |_, _| {
3819 let counter = Arc::clone(&counter);
3820 async move {
3821 sleep(Duration::from_millis(20)).await;
3822 counter.fetch_add(1, Ordering::SeqCst);
3823 }
3824 })?;
3825 }
3826
3827 property.set_async(100).await?;
3828 sleep(Duration::from_millis(150)).await;
3829
3830 // All sync observers should have executed immediately
3831 assert_eq!(sync_count.load(Ordering::SeqCst), 5);
3832
3833 // All async observers should have executed (even if limited)
3834 assert_eq!(async_count.load(Ordering::SeqCst), 5);
3835
3836 Ok(())
3837 }
3838}