observable_property_tokio/
lib.rs

1//! # Observable Property with Tokio
2//!
3//! A thread-safe, async-compatible observable property implementation for Rust that allows you to
4//! observe changes to values using Tokio for asynchronous operations.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access with optimized locking
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with Tokio tasks
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property_tokio::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), observable_property_tokio::PropertyError> {
23//!     // Create an observable property
24//!     let property = ObservableProperty::new(42);
25//!
26//!     // Subscribe to changes
27//!     let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
28//!         println!("Value changed from {} to {}", old_value, new_value);
29//!     }))?;
30//!
31//!     // Change the value (triggers observer)
32//!     property.set(100)?;
33//!
34//!     // For async notification (uses Tokio)
35//!     property.set_async(200).await?;
36//!
37//!     // Unsubscribe when done
38//!     property.unsubscribe(observer_id)?;
39//!
40//!     Ok(())
41//! }
42//! ```
43//!
44//! ## Multi-threading Example with Tokio
45//!
46//! ```rust
47//! use observable_property_tokio::ObservableProperty;
48//! use std::sync::Arc;
49//! use tokio::task;
50//!
51//! #[tokio::main]
52//! async fn main() -> Result<(), observable_property_tokio::PropertyError> {
53//!     let property = Arc::new(ObservableProperty::new(0));
54//!     let property_clone = property.clone();
55//!
56//!     // Subscribe from one task
57//!     property.subscribe(Arc::new(|old, new| {
58//!         println!("Value changed: {} -> {}", old, new);
59//!     }))?;
60//!
61//!     // Modify from another task
62//!     task::spawn(async move {
63//!         property_clone.set(42)?;
64//!         Ok::<_, observable_property_tokio::PropertyError>(())
65//!     }).await??;
66//!
67//!     Ok(())
68//! }
69//! ```
70
71use std::collections::HashMap;
72use std::fmt;
73use std::panic;
74use std::sync::Arc;
75use std::time::{SystemTime, UNIX_EPOCH};
76use parking_lot::RwLock;
77use thiserror::Error;
78use tokio::task::{self, JoinError};
79
80/// Errors that can occur when working with ObservableProperty
81#[derive(Error, Debug, Clone)]
82pub enum PropertyError {
83    /// Failed to acquire a read lock on the property
84    #[error("Failed to acquire read lock during '{operation}': {context}")]
85    ReadLockError {
86        /// The operation being attempted
87        operation: String,
88        /// Context describing what operation was being attempted
89        context: String,
90        /// Timestamp when error occurred (milliseconds since epoch)
91        timestamp_ms: u64,
92    },
93
94    /// Failed to acquire a write lock on the property
95    #[error("Failed to acquire write lock during '{operation}': {context}")]
96    WriteLockError {
97        /// The operation being attempted
98        operation: String,
99        /// Context describing what operation was being attempted
100        context: String,
101        /// Timestamp when error occurred (milliseconds since epoch)
102        timestamp_ms: u64,
103    },
104
105    /// Attempted to unsubscribe an observer that doesn't exist
106    #[error("Observer with ID {id} not found")]
107    ObserverNotFound {
108        /// The ID of the observer that wasn't found
109        id: ObserverId,
110    },
111
112    /// The property's lock has been poisoned due to a panic in another thread
113    #[error("Lock poisoned during '{operation}': {context}")]
114    LockPoisoned {
115        /// The operation that encountered the poisoned lock
116        operation: String,
117        /// Additional context about the poisoned lock
118        context: String,
119        /// Timestamp when error occurred (milliseconds since epoch)
120        timestamp_ms: u64,
121    },
122
123    /// An observer function panicked during execution
124    #[error("Observer {observer_id} panicked: {error}")]
125    ObserverPanic {
126        /// The ID of the observer that panicked
127        observer_id: ObserverId,
128        /// The panic error message
129        error: String,
130        /// Timestamp when error occurred (milliseconds since epoch)
131        timestamp_ms: u64,
132    },
133
134    /// An observer function encountered an error during execution
135    #[error("Observer execution failed: {reason}")]
136    ObserverError {
137        /// Description of what went wrong
138        reason: String,
139    },
140
141    /// A Tokio-related error occurred
142    #[error("Tokio runtime error: {reason}")]
143    TokioError {
144        /// Description of what went wrong
145        reason: String,
146    },
147
148    /// A task join error occurred
149    #[error("Task join error: {0}")]
150    JoinError(String),
151
152    /// Maximum capacity has been exceeded
153    #[error("Capacity exceeded: current={current}, max={max}, resource={resource}")]
154    CapacityExceeded {
155        /// Current count
156        current: usize,
157        /// Maximum allowed
158        max: usize,
159        /// The resource that exceeded capacity
160        resource: String,
161    },
162
163    /// Operation exceeded timeout threshold
164    #[error("Operation '{operation}' timed out: {elapsed_ms}ms > {threshold_ms}ms")]
165    OperationTimeout {
166        /// The operation that timed out
167        operation: String,
168        /// Actual elapsed time in milliseconds
169        elapsed_ms: u64,
170        /// Timeout threshold in milliseconds
171        threshold_ms: u64,
172    },
173
174    /// The property is shutting down and not accepting new operations
175    #[error("Property is shutting down")]
176    ShutdownInProgress,
177}
178
179impl PropertyError {
180    /// Get a diagnostic string suitable for logging and monitoring
181    ///
182    /// This method returns a structured string containing all relevant
183    /// diagnostic information about the error, including timestamps,
184    /// operation context, and performance metrics where applicable.
185    ///
186    /// # Returns
187    ///
188    /// A formatted string containing diagnostic information
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use observable_property_tokio::PropertyError;
194    ///
195    /// let error = PropertyError::OperationTimeout {
196    ///     operation: "notify_observers".to_string(),
197    ///     elapsed_ms: 5500,
198    ///     threshold_ms: 5000,
199    /// };
200    ///
201    /// let diagnostic = error.diagnostic_info();
202    /// assert!(diagnostic.contains("notify_observers"));
203    /// assert!(diagnostic.contains("elapsed_ms=5500"));
204    /// ```
205    pub fn diagnostic_info(&self) -> String {
206        match self {
207            Self::ReadLockError { operation, context, timestamp_ms } => {
208                format!(
209                    "READ_LOCK_ERROR | operation={} | context={} | timestamp_ms={}",
210                    operation, context, timestamp_ms
211                )
212            }
213            Self::WriteLockError { operation, context, timestamp_ms } => {
214                format!(
215                    "WRITE_LOCK_ERROR | operation={} | context={} | timestamp_ms={}",
216                    operation, context, timestamp_ms
217                )
218            }
219            Self::LockPoisoned { operation, context, timestamp_ms } => {
220                format!(
221                    "LOCK_POISONED | operation={} | context={} | timestamp_ms={}",
222                    operation, context, timestamp_ms
223                )
224            }
225            Self::ObserverPanic { observer_id, error, timestamp_ms } => {
226                format!(
227                    "OBSERVER_PANIC | observer_id={} | error={} | timestamp_ms={}",
228                    observer_id, error, timestamp_ms
229                )
230            }
231            Self::ObserverNotFound { id } => {
232                format!("OBSERVER_NOT_FOUND | id={}", id)
233            }
234            Self::CapacityExceeded { current, max, resource } => {
235                format!(
236                    "CAPACITY_EXCEEDED | resource={} | current={} | max={} | utilization={:.1}%",
237                    resource, current, max, (*current as f64 / *max as f64) * 100.0
238                )
239            }
240            Self::OperationTimeout { operation, elapsed_ms, threshold_ms } => {
241                format!(
242                    "OPERATION_TIMEOUT | operation={} | elapsed_ms={} | threshold_ms={} | overage_ms={}",
243                    operation, elapsed_ms, threshold_ms, elapsed_ms.saturating_sub(*threshold_ms)
244                )
245            }
246            Self::ShutdownInProgress => {
247                "SHUTDOWN_IN_PROGRESS | property is shutting down".to_string()
248            }
249            Self::ObserverError { reason } => {
250                format!("OBSERVER_ERROR | reason={}", reason)
251            }
252            Self::TokioError { reason } => {
253                format!("TOKIO_ERROR | reason={}", reason)
254            }
255            Self::JoinError(msg) => {
256                format!("JOIN_ERROR | message={}", msg)
257            }
258        }
259    }
260
261    /// Get the current timestamp in milliseconds since UNIX epoch
262    ///
263    /// This is a helper function used internally for error creation
264    fn current_timestamp_ms() -> u64 {
265        SystemTime::now()
266            .duration_since(UNIX_EPOCH)
267            .map(|d| d.as_millis() as u64)
268            .unwrap_or(0)
269    }
270
271    /// Create a ReadLockError with current timestamp
272    pub fn read_lock_error(operation: impl Into<String>, context: impl Into<String>) -> Self {
273        Self::ReadLockError {
274            operation: operation.into(),
275            context: context.into(),
276            timestamp_ms: Self::current_timestamp_ms(),
277        }
278    }
279
280    /// Create a WriteLockError with current timestamp
281    pub fn write_lock_error(operation: impl Into<String>, context: impl Into<String>) -> Self {
282        Self::WriteLockError {
283            operation: operation.into(),
284            context: context.into(),
285            timestamp_ms: Self::current_timestamp_ms(),
286        }
287    }
288
289    /// Create a LockPoisoned error with current timestamp
290    pub fn lock_poisoned(operation: impl Into<String>, context: impl Into<String>) -> Self {
291        Self::LockPoisoned {
292            operation: operation.into(),
293            context: context.into(),
294            timestamp_ms: Self::current_timestamp_ms(),
295        }
296    }
297
298    /// Create an ObserverPanic error with current timestamp
299    pub fn observer_panic(observer_id: ObserverId, error: impl Into<String>) -> Self {
300        Self::ObserverPanic {
301            observer_id,
302            error: error.into(),
303            timestamp_ms: Self::current_timestamp_ms(),
304        }
305    }
306}
307
308/// Configuration options for ObservableProperty
309///
310/// This struct allows you to configure limits and behavior for an observable property,
311/// helping prevent resource exhaustion and enabling production-grade backpressure handling.
312///
313/// # Examples
314///
315/// ```
316/// use observable_property_tokio::{ObservableProperty, PropertyConfig};
317///
318/// let config = PropertyConfig {
319///     max_observers: 100,
320///     max_pending_notifications: 50,
321///     observer_timeout_ms: 5000,
322///     max_concurrent_async_tasks: 50,
323/// };
324///
325/// let property = ObservableProperty::new_with_config(42, config);
326/// ```
327#[derive(Debug, Clone)]
328pub struct PropertyConfig {
329    /// Maximum number of observers allowed
330    ///
331    /// When this limit is reached, attempts to subscribe additional observers
332    /// will return a `PropertyError::CapacityExceeded` error.
333    ///
334    /// Default: 1000
335    pub max_observers: usize,
336
337    /// Maximum pending async notifications per observer (reserved for future use)
338    ///
339    /// This limit helps prevent memory exhaustion from queued notifications.
340    ///
341    /// Default: 100
342    pub max_pending_notifications: usize,
343
344    /// Timeout for observer execution in milliseconds (reserved for future use)
345    ///
346    /// Observers that exceed this threshold may be logged or flagged for debugging.
347    ///
348    /// Default: 5000ms
349    pub observer_timeout_ms: u64,
350
351    /// Maximum number of concurrent async observer tasks
352    ///
353    /// This limit prevents resource exhaustion by limiting the number of
354    /// async observer notifications that can execute simultaneously.
355    /// When the limit is reached, new async notifications will wait until
356    /// a slot becomes available (using a semaphore for coordination).
357    ///
358    /// Default: 100
359    pub max_concurrent_async_tasks: usize,
360}
361
362impl Default for PropertyConfig {
363    fn default() -> Self {
364        Self {
365            max_observers: 1000,
366            max_pending_notifications: 100,
367            observer_timeout_ms: 5000,
368            max_concurrent_async_tasks: 100,
369        }
370    }
371}
372
373/// Report generated after a property shutdown operation
374///
375/// Contains diagnostic information about the shutdown process,
376/// including the number of observers cleared and timing information.
377///
378/// # Examples
379///
380/// ```
381/// use observable_property_tokio::{ObservableProperty, PropertyConfig};
382/// use std::time::Duration;
383///
384/// #[tokio::main]
385/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
386///     let property = ObservableProperty::new(42);
387///     
388///     // ... use property ...
389///     
390///     let report = property.shutdown_with_timeout(Duration::from_secs(5)).await?;
391///     println!("Shutdown complete: {:?}", report);
392///     Ok(())
393/// }
394/// ```
395#[derive(Debug, Clone)]
396pub struct ShutdownReport {
397    /// Number of observers that were cleared during shutdown
398    pub observers_cleared: usize,
399    
400    /// Time taken to complete the shutdown operation
401    pub shutdown_duration: std::time::Duration,
402    
403    /// Whether the shutdown completed within the timeout period
404    pub completed_within_timeout: bool,
405    
406    /// Timestamp when shutdown was initiated (milliseconds since epoch)
407    pub initiated_at_ms: u64,
408}
409
410impl ShutdownReport {
411    /// Get a diagnostic string for logging
412    pub fn diagnostic_info(&self) -> String {
413        format!(
414            "SHUTDOWN_COMPLETE | observers_cleared={} | duration_ms={} | within_timeout={} | initiated_at_ms={}",
415            self.observers_cleared,
416            self.shutdown_duration.as_millis(),
417            self.completed_within_timeout,
418            self.initiated_at_ms
419        )
420    }
421}
422
423/// Function type for observers that get called when property values change
424pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
425
426/// Unique identifier for registered observers
427#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
428pub struct ObserverId(pub(crate) usize);
429
430impl From<ObserverId> for usize {
431    /// Convert an ObserverId to a usize
432    ///
433    /// This allows backward compatibility with code that expects to use
434    /// the ID as a regular number.
435    ///
436    /// # Examples
437    ///
438    /// ```
439    /// use observable_property_tokio::ObserverId;
440    ///
441    /// let id = ObserverId::from(42); // For illustration - actual IDs come from subscribe()
442    /// let value: usize = id.into();
443    /// assert_eq!(value, 42);
444    /// ```
445    fn from(id: ObserverId) -> Self {
446        id.0
447    }
448}
449
450impl From<usize> for ObserverId {
451    /// Create an ObserverId from a usize
452    ///
453    /// This is primarily for backward compatibility and testing.
454    /// In normal usage, IDs are created by the library through subscribe() calls.
455    ///
456    /// # Examples
457    ///
458    /// ```
459    /// use observable_property_tokio::ObserverId;
460    ///
461    /// let id = ObserverId::from(42);
462    /// ```
463    fn from(value: usize) -> Self {
464        ObserverId(value)
465    }
466}
467
468impl fmt::Display for ObserverId {
469    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470        write!(f, "{}", self.0)
471    }
472}
473
474/// A thread-safe observable property that notifies observers when its value changes
475///
476/// This type wraps a value of type `T` and allows multiple observers to be notified
477/// whenever the value is modified. All operations are thread-safe and can be called
478/// from multiple threads concurrently. Asynchronous operations are powered by Tokio.
479///
480/// # Type Requirements
481///
482/// The generic type `T` must implement:
483/// - `Clone`: Required for returning values and passing them to observers
484/// - `Send`: Required for transferring between threads
485/// - `Sync`: Required for concurrent access from multiple threads  
486/// - `'static`: Required for observer callbacks that may outlive the original scope
487///
488/// # Examples
489///
490/// ```rust
491/// use observable_property_tokio::ObservableProperty;
492/// use std::sync::Arc;
493///
494/// #[tokio::main]
495/// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
496///     let property = ObservableProperty::new("initial".to_string());
497///
498///     let observer_id = property.subscribe(Arc::new(|old, new| {
499///         println!("Changed from '{}' to '{}'", old, new);
500///     }))?;
501///
502///     property.set("updated".to_string())?; // Prints: Changed from 'initial' to 'updated'
503///
504///     // Async version
505///     property.set_async("async update".to_string()).await?;
506///
507///     property.unsubscribe(observer_id)?;
508///
509///     Ok(())
510/// }
511/// ```
512pub struct ObservableProperty<T> {
513    inner: Arc<RwLock<InnerProperty<T>>>,
514    config: PropertyConfig,
515    async_task_semaphore: Arc<tokio::sync::Semaphore>,
516}
517
518struct InnerProperty<T> {
519    value: T,
520    observers: HashMap<ObserverId, Observer<T>>,
521    next_id: usize,
522}
523
524pub struct PropertyHandle<T: Clone + Send + Sync + 'static> {
525    inner: Arc<RwLock<InnerProperty<T>>>,
526}
527
528impl<T: Clone + Send + Sync + 'static> PropertyHandle<T> {
529    /// Removes an observer by its ID, ignoring if it doesn't exist
530    ///
531    /// This is a convenience method that doesn't return an error if the observer doesn't exist.
532    ///
533    /// # Arguments
534    ///
535    /// * `id` - The observer ID returned by `subscribe()`
536    ///
537    /// # Returns
538    ///
539    /// `true` if an observer was removed, `false` if no observer with that ID existed
540    pub fn try_unsubscribe(&self, id: ObserverId) -> bool {
541        let mut inner = self.inner.write();
542        inner.observers.remove(&id).is_some()
543    }
544}
545
546pub struct Subscription<T: Clone + Send + Sync + 'static> {
547    inner: Arc<RwLock<InnerProperty<T>>>,
548    id: ObserverId,
549}
550
551impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
552    fn drop(&mut self) {
553        let mut inner = self.inner.write();
554        inner.observers.remove(&self.id);
555    }
556}
557
558impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
559    /// Creates a new observable property with the given initial value
560    ///
561    /// Uses default configuration with max_observers=1000.
562    /// For custom limits, use `new_with_config()`.
563    ///
564    /// # Arguments
565    ///
566    /// * `initial_value` - The starting value for this property
567    ///
568    /// # Examples
569    ///
570    /// ```rust
571    /// use observable_property_tokio::ObservableProperty;
572    ///
573    /// let property = ObservableProperty::new(42);
574    /// assert_eq!(property.get().unwrap(), 42);
575    /// ```
576    pub fn new(initial_value: T) -> Self {
577        Self::new_with_config(initial_value, PropertyConfig::default())
578    }
579
580    /// Creates a new observable property with custom configuration
581    ///
582    /// This allows you to specify limits on the number of observers and other
583    /// behavioral parameters to prevent resource exhaustion.
584    ///
585    /// # Arguments
586    ///
587    /// * `initial_value` - The starting value for this property
588    /// * `config` - Configuration options for backpressure and resource limits
589    ///
590    /// # Examples
591    ///
592    /// ```rust
593    /// use observable_property_tokio::{ObservableProperty, PropertyConfig};
594    ///
595    /// let config = PropertyConfig {
596    ///     max_observers: 50,
597    ///     max_pending_notifications: 100,
598    ///     observer_timeout_ms: 3000,
599    ///     max_concurrent_async_tasks: 50,
600    /// };
601    ///
602    /// let property = ObservableProperty::new_with_config(0, config);
603    /// assert_eq!(property.get().unwrap(), 0);
604    /// ```
605    pub fn new_with_config(initial_value: T, config: PropertyConfig) -> Self {
606        let semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_concurrent_async_tasks));
607        
608        Self {
609            inner: Arc::new(RwLock::new(InnerProperty {
610                value: initial_value,
611                observers: HashMap::new(),
612                next_id: 0,
613            })),
614            config,
615            async_task_semaphore: semaphore,
616        }
617    }
618
619    /// Gets the current value of the property
620    ///
621    /// This method acquires a read lock, which allows multiple concurrent readers.
622    ///
623    /// # Returns
624    ///
625    /// `Ok(T)` containing a clone of the current value
626    ///
627    /// # Examples
628    ///
629    /// ```rust
630    /// use observable_property_tokio::ObservableProperty;
631    ///
632    /// let property = ObservableProperty::new("hello".to_string());
633    /// assert_eq!(property.get().unwrap(), "hello");
634    /// ```
635    pub fn get(&self) -> Result<T, PropertyError> {
636        Ok(self.inner.read().value.clone())
637    }
638
639    /// Gets a reference to the current value of the property
640    ///
641    /// This method acquires a read lock and returns a guard that derefs to the value,
642    /// allowing you to read the value without cloning it.
643    ///
644    /// # Returns
645    ///
646    /// A RAII guard that derefs to a reference of the value
647    ///
648    /// # Examples
649    ///
650    /// ```rust
651    /// use observable_property_tokio::ObservableProperty;
652    ///
653    /// let property = ObservableProperty::new("hello".to_string());
654    /// let value_ref = property.get_ref();
655    /// assert_eq!(*value_ref, "hello");
656    /// // Lock is released when value_ref goes out of scope
657    /// ```
658    pub fn get_ref(&self) -> impl std::ops::Deref<Target = T> + '_ {
659        parking_lot::RwLockReadGuard::map(self.inner.read(), |inner| &inner.value)
660    }
661
662    /// Sets the property to a new value and notifies all observers
663    ///
664    /// This method will:
665    /// 1. Acquire a write lock (blocking other readers/writers)
666    /// 2. Update the value and capture a snapshot of observers
667    /// 3. Release the lock
668    /// 4. Notify all observers sequentially with the old and new values
669    ///
670    /// Observer notifications are wrapped in panic recovery to prevent one
671    /// misbehaving observer from affecting others.
672    ///
673    /// # Arguments
674    ///
675    /// * `new_value` - The new value to set
676    ///
677    /// # Returns
678    ///
679    /// `Ok(())` if successful
680    ///
681    /// # Examples
682    ///
683    /// ```rust
684    /// use observable_property_tokio::ObservableProperty;
685    /// use std::sync::Arc;
686    ///
687    /// #[tokio::main]
688    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
689    ///     let property = ObservableProperty::new(10);
690    ///
691    ///     property.subscribe(Arc::new(|old, new| {
692    ///         println!("Value changed from {} to {}", old, new);
693    ///     }))?;
694    ///
695    ///     property.set(20)?; // Triggers observer notification
696    ///
697    ///     Ok(())
698    /// }
699    /// ```
700    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
701        let (old_value, observers_snapshot) = {
702            let mut inner = self.inner.write();
703
704            let old_value = inner.value.clone();
705            inner.value = new_value.clone();
706            let observers_snapshot: Vec<Observer<T>> = inner.observers.values().cloned().collect();
707            (old_value, observers_snapshot)
708        };
709
710        for observer in observers_snapshot {
711            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
712                observer(&old_value, &new_value);
713            })) {
714                eprintln!("Observer panic: {:?}", e);
715            }
716        }
717
718        Ok(())
719    }
720
721    /// Sets the property to a new value and notifies observers asynchronously using Tokio tasks
722    ///
723    /// This method is similar to `set()` but spawns observers in individual Tokio tasks
724    /// for non-blocking operation. This is useful when observers might perform
725    /// time-consuming operations.
726    ///
727    /// # Arguments
728    ///
729    /// * `new_value` - The new value to set
730    ///
731    /// # Returns
732    ///
733    /// `Ok(())` if successful
734    ///
735    /// # Examples
736    ///
737    /// ```rust
738    /// use observable_property_tokio::ObservableProperty;
739    /// use std::sync::Arc;
740    /// use tokio::time::{sleep, Duration};
741    ///
742    /// #[tokio::main]
743    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
744    ///     let property = ObservableProperty::new(0);
745    ///
746    ///     property.subscribe(Arc::new(move |old, new| {
747    ///         // This observer does slow work but won't block the caller
748    ///         println!("Slow observer: {} -> {}", old, new);
749    ///     }))?;
750    ///
751    ///     // This returns immediately even though observer may be slow
752    ///     property.set_async(42).await?;
753    ///
754    ///     // Give time for observers to run
755    ///     sleep(Duration::from_millis(10)).await;
756    ///
757    ///     Ok(())
758    /// }
759    /// ```
760    pub async fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
761        let (old_value, observers_snapshot) = {
762            let mut inner = self.inner.write();
763
764            let old_value = inner.value.clone();
765            inner.value = new_value.clone();
766            let observers_snapshot: Vec<Observer<T>> = inner.observers.values().cloned().collect();
767            (old_value, observers_snapshot)
768        };
769
770        if observers_snapshot.is_empty() {
771            return Ok(());
772        }
773
774        // Spawn a separate Tokio task for each observer with semaphore-based connection pooling
775        let mut tasks = Vec::with_capacity(observers_snapshot.len());
776
777        for observer in observers_snapshot {
778            let old_val = old_value.clone();
779            let new_val = new_value.clone();
780            let semaphore = Arc::clone(&self.async_task_semaphore);
781
782            let task = task::spawn(async move {
783                // Acquire permit from semaphore before executing observer
784                let _permit = semaphore.acquire().await.expect("Semaphore closed");
785                
786                if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
787                    observer(&old_val, &new_val);
788                })) {
789                    eprintln!("Observer panic in task: {:?}", e);
790                }
791                // Permit is automatically released when _permit is dropped
792            });
793
794            tasks.push(task);
795        }
796
797        // Wait for all tasks to complete to prevent resource leaks
798        for task in tasks {
799            task.await.map_err(|e| PropertyError::JoinError(format!("Task join error: {}", e)))?;
800        }
801
802        Ok(())
803    }
804
805    /// Update the property value using a closure that has access to the current value
806    ///
807    /// This is a more ergonomic way to update a property based on its current value,
808    /// without having to call `get()` and `set()` separately.
809    ///
810    /// # Arguments
811    ///
812    /// * `update_fn` - A function that takes the current value and returns a new value
813    ///
814    /// # Returns
815    ///
816    /// `Ok(())` if successful
817    ///
818    /// # Examples
819    ///
820    /// ```rust
821    /// use observable_property_tokio::ObservableProperty;
822    ///
823    /// let property = ObservableProperty::new(10);
824    ///
825    /// // Double the value
826    /// property.update(|current| current * 2)?;
827    /// assert_eq!(property.get()?, 20);
828    ///
829    /// // Add 5
830    /// property.update(|current| current + 5)?;
831    /// assert_eq!(property.get()?, 25);
832    /// # Ok::<(), observable_property_tokio::PropertyError>(())
833    /// ```
834    pub fn update<F>(&self, update_fn: F) -> Result<(), PropertyError>
835    where
836        F: FnOnce(T) -> T,
837    {
838        let new_value = update_fn(self.get()?);
839        self.set(new_value)
840    }
841
842    /// Update the property value asynchronously using a closure
843    ///
844    /// Like `update()` but uses `set_async()` for the update.
845    ///
846    /// # Arguments
847    ///
848    /// * `update_fn` - A function that takes the current value and returns a new value
849    ///
850    /// # Returns
851    ///
852    /// `Ok(())` if successful
853    ///
854    /// # Examples
855    ///
856    /// ```rust
857    /// use observable_property_tokio::ObservableProperty;
858    ///
859    /// #[tokio::main]
860    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
861    ///     let property = ObservableProperty::new("hello".to_string());
862    ///
863    ///     property.update_async(|current| format!("{} world", current)).await?;
864    ///     assert_eq!(property.get()?, "hello world");
865    ///
866    ///     Ok(())
867    /// }
868    /// ```
869    pub async fn update_async<F>(&self, update_fn: F) -> Result<(), PropertyError>
870    where
871        F: FnOnce(T) -> T,
872    {
873        let new_value = update_fn(self.get()?);
874        self.set_async(new_value).await
875    }
876
877    /// Subscribes an observer function to be called when the property changes
878    ///
879    /// The observer function will be called with the old and new values whenever
880    /// the property is modified via `set()` or `set_async()`.
881    ///
882    /// # Arguments
883    ///
884    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
885    ///
886    /// # Returns
887    ///
888    /// `Ok(ObserverId)` containing a unique identifier for this observer
889    ///
890    /// # Examples
891    ///
892    /// ```rust
893    /// use observable_property_tokio::ObservableProperty;
894    /// use std::sync::Arc;
895    ///
896    /// #[tokio::main]
897    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
898    ///     let property = ObservableProperty::new(0);
899    ///
900    ///     let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
901    ///         println!("Property changed from {} to {}", old_value, new_value);
902    ///     }))?;
903    ///
904    ///     // Later, unsubscribe using the returned ID
905    ///     property.unsubscribe(observer_id)?;
906    ///
907    ///     Ok(())
908    /// }
909    /// ```
910    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
911        let mut inner = self.inner.write();
912
913        // Check if we've reached the maximum number of observers
914        if inner.observers.len() >= self.config.max_observers {
915            return Err(PropertyError::CapacityExceeded {
916                current: inner.observers.len(),
917                max: self.config.max_observers,
918                resource: "observers".to_string(),
919            });
920        }
921
922        let id = ObserverId(inner.next_id);
923        inner.next_id += 1;
924        inner.observers.insert(id, observer);
925        Ok(id)
926    }
927
928    /// Removes an observer by its ID
929    ///
930    /// # Arguments
931    ///
932    /// * `id` - The observer ID returned by `subscribe()`
933    ///
934    /// # Returns
935    ///
936    /// `Ok(())` if the observer was removed, or `Err(PropertyError::ObserverNotFound)`
937    /// if no observer with that ID existed.
938    ///
939    /// # Examples
940    ///
941    /// ```rust
942    /// use observable_property_tokio::ObservableProperty;
943    /// use std::sync::Arc;
944    ///
945    /// #[tokio::main]
946    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
947    ///     let property = ObservableProperty::new(0);
948    ///     let id = property.subscribe(Arc::new(|_, _| {}))?;
949    ///
950    ///     // Remove the observer
951    ///     property.unsubscribe(id)?;
952    ///
953    ///     // Trying to remove again fails with ObserverNotFound
954    ///     match property.unsubscribe(id) {
955    ///         Err(observable_property_tokio::PropertyError::ObserverNotFound { .. }) => {
956    ///             println!("Observer was already removed, as expected");
957    ///         }
958    ///         _ => panic!("Expected ObserverNotFound error"),
959    ///     }
960    ///
961    ///     Ok(())
962    /// }
963    /// ```
964    pub fn unsubscribe(&self, id: ObserverId) -> Result<(), PropertyError> {
965        let mut inner = self.inner.write();
966
967        if inner.observers.remove(&id).is_some() {
968            Ok(())
969        } else {
970            Err(PropertyError::ObserverNotFound { id })
971        }
972    }
973
974    /// Removes an observer by its ID, ignoring if it doesn't exist
975    ///
976    /// This is a convenience method that doesn't return an error if the observer doesn't exist.
977    ///
978    /// # Arguments
979    ///
980    /// * `id` - The observer ID returned by `subscribe()`
981    ///
982    /// # Returns
983    ///
984    /// `true` if an observer was removed, `false` if no observer with that ID existed
985    ///
986    /// # Examples
987    ///
988    /// ```rust
989    /// use observable_property_tokio::ObservableProperty;
990    /// use std::sync::Arc;
991    ///
992    /// #[tokio::main]
993    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
994    ///     let property = ObservableProperty::new(0);
995    ///     let id = property.subscribe(Arc::new(|_, _| {}))?;
996    ///
997    ///     // Remove the observer
998    ///     assert!(property.try_unsubscribe(id));
999    ///
1000    ///     // Trying to remove again just returns false
1001    ///     assert!(!property.try_unsubscribe(id));
1002    ///
1003    ///     Ok(())
1004    /// }
1005    /// ```
1006    pub fn try_unsubscribe(&self, id: ObserverId) -> bool {
1007        let mut inner = self.inner.write();
1008        inner.observers.remove(&id).is_some()
1009    }
1010
1011    /// Returns the number of active observers for this property
1012    ///
1013    /// # Returns
1014    ///
1015    /// The number of observers currently subscribed to this property
1016    ///
1017    /// # Examples
1018    ///
1019    /// ```rust
1020    /// use observable_property_tokio::ObservableProperty;
1021    /// use std::sync::Arc;
1022    ///
1023    /// #[tokio::main]
1024    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1025    ///     let property = ObservableProperty::new(0);
1026    ///
1027    ///     assert_eq!(property.observer_count(), 0);
1028    ///
1029    ///     let id1 = property.subscribe(Arc::new(|_, _| {}))?;
1030    ///     let id2 = property.subscribe(Arc::new(|_, _| {}))?;
1031    ///
1032    ///     assert_eq!(property.observer_count(), 2);
1033    ///
1034    ///     property.unsubscribe(id1)?;
1035    ///
1036    ///     assert_eq!(property.observer_count(), 1);
1037    ///
1038    ///     property.unsubscribe(id2)?;
1039    ///
1040    ///     assert_eq!(property.observer_count(), 0);
1041    ///
1042    ///     Ok(())
1043    /// }
1044    /// ```
1045    pub fn observer_count(&self) -> usize {
1046        self.inner.read().observers.len()
1047    }
1048
1049    /// Subscribes an observer that only gets called when a filter condition is met
1050    ///
1051    /// This is useful for observing only specific types of changes, such as
1052    /// when a value increases or crosses a threshold.
1053    ///
1054    /// # Arguments
1055    ///
1056    /// * `observer` - The observer function to call when the filter passes
1057    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1058    ///
1059    /// # Returns
1060    ///
1061    /// `Ok(ObserverId)` for the filtered observer
1062    ///
1063    /// # Examples
1064    ///
1065    /// ```rust
1066    /// use observable_property_tokio::ObservableProperty;
1067    /// use std::sync::Arc;
1068    ///
1069    /// #[tokio::main]
1070    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1071    ///     let property = ObservableProperty::new(0);
1072    ///
1073    ///     // Only notify when value increases
1074    ///     let id = property.subscribe_filtered(
1075    ///         Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
1076    ///         |old, new| new > old
1077    ///     )?;
1078    ///
1079    ///     property.set(10)?; // Triggers observer (0 -> 10)
1080    ///     property.set(5)?;  // Does NOT trigger observer (10 -> 5)
1081    ///     property.set(15)?; // Triggers observer (5 -> 15)
1082    ///
1083    ///     Ok(())
1084    /// }
1085    /// ```
1086    pub fn subscribe_filtered<F>(
1087        &self,
1088        observer: Observer<T>,
1089        filter: F,
1090    ) -> Result<ObserverId, PropertyError>
1091    where
1092        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1093    {
1094        let filter = Arc::new(filter);
1095        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1096            if filter(old_val, new_val) {
1097                observer(old_val, new_val);
1098            }
1099        });
1100
1101        self.subscribe(filtered_observer)
1102    }
1103
1104    /// Subscribe with an async handler that will be executed as a Tokio task
1105    ///
1106    /// This version allows you to use async functions as observers. The handler is
1107    /// spawned as a Tokio task whenever the property changes.
1108    ///
1109    /// # Arguments
1110    ///
1111    /// * `handler` - An async function or closure that takes old and new values
1112    ///
1113    /// # Returns
1114    ///
1115    /// `Ok(ObserverId)` containing a unique identifier for this observer
1116    ///
1117    /// # Examples
1118    ///
1119    /// ```rust
1120    /// use observable_property_tokio::ObservableProperty;
1121    /// use std::sync::Arc;
1122    /// use tokio::time::{sleep, Duration};
1123    ///
1124    /// #[tokio::main]
1125    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1126    ///     let property = ObservableProperty::new(0);
1127    ///
1128    ///     // This handler can perform async operations
1129    ///     property.subscribe_async(|old, new| async move {
1130    ///         // Simulate some async work
1131    ///         sleep(Duration::from_millis(10)).await;
1132    ///         println!("Async observer: {} -> {}", old, new);
1133    ///     })?;
1134    ///
1135    ///     property.set_async(42).await?;
1136    ///
1137    ///     // Give time for observers to complete
1138    ///     sleep(Duration::from_millis(20)).await;
1139    ///
1140    ///     Ok(())
1141    /// }
1142    /// ```
1143    pub fn subscribe_async<F, Fut>(&self, handler: F) -> Result<ObserverId, PropertyError>
1144    where
1145        F: Fn(T, T) -> Fut + Send + Sync + 'static,
1146        Fut: std::future::Future<Output = ()> + Send + 'static,
1147    {
1148        // Wrap the handler in an Arc so we can clone it for each invocation
1149        let handler = Arc::new(handler);
1150        let semaphore = Arc::clone(&self.async_task_semaphore);
1151
1152        let observer = Arc::new(move |old: &T, new: &T| {
1153            let old_val = old.clone();
1154            let new_val = new.clone();
1155            // Clone the handler so we can move it into the task
1156            let handler_clone = Arc::clone(&handler);
1157            let semaphore_clone = Arc::clone(&semaphore);
1158
1159            tokio::spawn(async move {
1160                // Acquire permit from semaphore before executing async handler
1161                let _permit = semaphore_clone.acquire().await.expect("Semaphore closed");
1162                handler_clone(old_val, new_val).await;
1163                // Permit is automatically released when _permit is dropped
1164            });
1165        });
1166
1167        self.subscribe(observer)
1168    }
1169
1170    /// Subscribe with an async handler that is filtered
1171    ///
1172    /// Combines `subscribe_filtered` and `subscribe_async` to provide an async handler
1173    /// that only runs when the filter condition is met.
1174    ///
1175    /// # Arguments
1176    ///
1177    /// * `handler` - An async function or closure that takes old and new values
1178    /// * `filter` - A predicate function that decides if the handler should be called
1179    ///
1180    /// # Returns
1181    ///
1182    /// `Ok(ObserverId)` containing a unique identifier for this observer
1183    ///
1184    /// # Examples
1185    ///
1186    /// ```rust
1187    /// use observable_property_tokio::ObservableProperty;
1188    /// use tokio::time::{sleep, Duration};
1189    ///
1190    /// #[tokio::main]
1191    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1192    ///     let property = ObservableProperty::new(0);
1193    ///
1194    ///     // This async handler only runs when value increases
1195    ///     property.subscribe_async_filtered(
1196    ///         |old, new| async move {
1197    ///             sleep(Duration::from_millis(10)).await;
1198    ///             println!("Value increased: {} -> {}", old, new);
1199    ///         },
1200    ///         |old, new| new > old
1201    ///     )?;
1202    ///
1203    ///     property.set(10)?; // Triggers observer (0 -> 10)
1204    ///     property.set(5)?;  // Does NOT trigger observer (10 -> 5)
1205    ///     property.set(15)?; // Triggers observer (5 -> 15)
1206    ///
1207    ///     // Give time for observers to complete
1208    ///     sleep(Duration::from_millis(20)).await;
1209    ///
1210    ///     Ok(())
1211    /// }
1212    /// ```
1213    pub fn subscribe_async_filtered<F, Fut, Filt>(
1214        &self,
1215        handler: F,
1216        filter: Filt,
1217    ) -> Result<ObserverId, PropertyError>
1218    where
1219        F: Fn(T, T) -> Fut + Send + Sync + 'static,
1220        Fut: std::future::Future<Output = ()> + Send + 'static,
1221        Filt: Fn(&T, &T) -> bool + Send + Sync + 'static,
1222    {
1223        let filter = Arc::new(filter);
1224        let handler = Arc::new(handler);
1225        let semaphore = Arc::clone(&self.async_task_semaphore);
1226
1227        let observer = Arc::new(move |old: &T, new: &T| {
1228            if filter(old, new) {
1229                let old_val = old.clone();
1230                let new_val = new.clone();
1231                let handler_clone = Arc::clone(&handler);
1232                let semaphore_clone = Arc::clone(&semaphore);
1233
1234                tokio::spawn(async move {
1235                    // Acquire permit from semaphore before executing async handler
1236                    let _permit = semaphore_clone.acquire().await.expect("Semaphore closed");
1237                    handler_clone(old_val, new_val).await;
1238                    // Permit is automatically released when _permit is dropped
1239                });
1240            }
1241        });
1242
1243        self.subscribe(observer)
1244    }
1245
1246    /// Create a new ObservableProperty with transformation applied to the value
1247    ///
1248    /// This creates a derived property that tracks changes to the original property,
1249    /// but with a transformation applied. Changes to the original property are reflected
1250    /// in the derived property, but the derived property is read-only.
1251    ///
1252    /// # Arguments
1253    ///
1254    /// * `transform` - A function that converts from the source type to the target type
1255    ///
1256    /// # Returns
1257    ///
1258    /// `Ok(ObservableProperty<U>)` containing a new property that reflects the transformed value,
1259    /// or `Err(PropertyError)` if the initial value cannot be read or the observer cannot be subscribed
1260    ///
1261    /// # Examples
1262    ///
1263    /// ```rust
1264    /// use observable_property_tokio::ObservableProperty;
1265    /// use std::sync::Arc;
1266    ///
1267    /// #[tokio::main]
1268    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1269    ///     let original = ObservableProperty::new(42);
1270    ///
1271    ///     // Create a derived property that doubles the value
1272    ///     let doubled = original.map(|value| value * 2)?;
1273    ///
1274    ///     assert_eq!(doubled.get()?, 84);
1275    ///
1276    ///     // When original changes, doubled reflects the transformation
1277    ///     original.set(10)?;
1278    ///     assert_eq!(doubled.get()?, 20);
1279    ///
1280    ///     Ok(())
1281    /// }
1282    /// ```
1283    pub fn map<U, F>(&self, transform: F) -> Result<ObservableProperty<U>, PropertyError>
1284    where
1285        U: Clone + Send + Sync + 'static,
1286        F: Fn(&T) -> U + Send + Sync + 'static,
1287    {
1288        let transform = Arc::new(transform);
1289        let initial_value = transform(&self.get()?);
1290        let derived = ObservableProperty::new(initial_value);
1291
1292        let derived_clone = derived.clone();
1293        self.subscribe(Arc::new(move |_, new_value| {
1294            let transformed = transform(new_value);
1295            if let Err(e) = derived_clone.set(transformed) {
1296                eprintln!("Failed to update derived property: {}", e);
1297            }
1298        }))?;
1299
1300        Ok(derived)
1301    }
1302
1303    /// Removes all registered observers from this property
1304    ///
1305    /// This method clears all observers that have been registered via `subscribe()`,
1306    /// `subscribe_async()`, `subscribe_filtered()`, or `subscribe_async_filtered()`.
1307    /// After calling this method, no observers will be notified of value changes
1308    /// until new ones are registered.
1309    ///
1310    /// This is useful for cleanup scenarios or when you need to reset the observer
1311    /// state without creating a new property instance.
1312    ///
1313    /// # Returns
1314    ///
1315    /// `Ok(())` if successful
1316    ///
1317    /// # Examples
1318    ///
1319    /// ```rust
1320    /// use observable_property_tokio::ObservableProperty;
1321    /// use std::sync::Arc;
1322    ///
1323    /// #[tokio::main]
1324    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1325    ///     let property = ObservableProperty::new(42);
1326    ///
1327    ///     // Register some observers
1328    ///     property.subscribe(Arc::new(|old, new| {
1329    ///         println!("Value changed from {} to {}", old, new);
1330    ///     }))?;
1331    ///
1332    ///     assert_eq!(property.observer_count(), 1);
1333    ///
1334    ///     // Clear all observers
1335    ///     property.clear_observers()?;
1336    ///     assert_eq!(property.observer_count(), 0);
1337    ///
1338    ///     // Setting value now won't trigger any observers
1339    ///     property.set(100)?;
1340    ///
1341    ///     Ok(())
1342    /// }
1343    /// ```
1344    pub fn clear_observers(&self) -> Result<(), PropertyError> {
1345        let mut inner = self.inner.write();
1346        inner.observers.clear();
1347        Ok(())
1348    }
1349
1350    /// Performs cleanup operations on this property
1351    ///
1352    /// This method clears all registered observers, effectively shutting down
1353    /// the property's observer functionality. This is particularly useful in
1354    /// production environments where you need to ensure proper resource cleanup
1355    /// during application shutdown or when disposing of property instances.
1356    ///
1357    /// Currently, this method performs the same operation as `clear_observers()`,
1358    /// but it's provided as a separate method to allow for future expansion
1359    /// of cleanup operations (such as canceling pending async tasks).
1360    ///
1361    /// # Returns
1362    ///
1363    /// `Ok(())` if successful
1364    ///
1365    /// # Examples
1366    ///
1367    /// ```rust
1368    /// use observable_property_tokio::ObservableProperty;
1369    /// use std::sync::Arc;
1370    ///
1371    /// #[tokio::main]
1372    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1373    ///     let property = ObservableProperty::new("active".to_string());
1374    ///
1375    ///     // Register observers for normal operation
1376    ///     property.subscribe(Arc::new(|old, new| {
1377    ///         println!("Status changed: {} -> {}", old, new);
1378    ///     }))?;
1379    ///
1380    ///     // ... normal application usage ...
1381    ///
1382    ///     // Shutdown the property when done
1383    ///     property.shutdown()?;
1384    ///
1385    ///     // Property can still be used for getting/setting values,
1386    ///     // but no observers will be notified
1387    ///     property.set("inactive".to_string())?;
1388    ///
1389    ///     Ok(())
1390    /// }
1391    /// ```
1392    pub fn shutdown(&self) -> Result<(), PropertyError> {
1393        // Cancel any pending async observers
1394        self.clear_observers()
1395    }
1396
1397    /// Shutdown the property with a timeout, waiting for pending async operations
1398    ///
1399    /// This method performs a comprehensive shutdown that:
1400    /// 1. Clears all observers
1401    /// 2. Waits for a grace period to allow pending async operations to complete
1402    /// 3. Returns a detailed report about the shutdown process
1403    ///
1404    /// # Arguments
1405    ///
1406    /// * `timeout` - Maximum duration to wait for shutdown to complete
1407    ///
1408    /// # Returns
1409    ///
1410    /// `Ok(ShutdownReport)` containing shutdown metrics and diagnostics
1411    ///
1412    /// # Examples
1413    ///
1414    /// ```
1415    /// use observable_property_tokio::ObservableProperty;
1416    /// use std::sync::Arc;
1417    /// use std::time::Duration;
1418    ///
1419    /// #[tokio::main]
1420    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1421    ///     let property = ObservableProperty::new(0);
1422    ///     
1423    ///     // Add some async observers
1424    ///     property.subscribe_async(|_, new| async move {
1425    ///         println!("Async observer: {}", new);
1426    ///     })?;
1427    ///     
1428    ///     property.subscribe(Arc::new(|_, new| {
1429    ///         println!("Sync observer: {}", new);
1430    ///     }))?;
1431    ///     
1432    ///     // ... use property ...
1433    ///     
1434    ///     // Graceful shutdown with timeout
1435    ///     let report = property.shutdown_with_timeout(Duration::from_secs(5)).await?;
1436    ///     
1437    ///     println!("Shutdown report: {}", report.diagnostic_info());
1438    ///     println!("Cleared {} observers in {:?}", 
1439    ///         report.observers_cleared, 
1440    ///         report.shutdown_duration);
1441    ///     
1442    ///     Ok(())
1443    /// }
1444    /// ```
1445    pub async fn shutdown_with_timeout(
1446        &self,
1447        timeout: std::time::Duration,
1448    ) -> Result<ShutdownReport, PropertyError> {
1449        use std::time::{SystemTime, UNIX_EPOCH, Instant};
1450        
1451        let start = Instant::now();
1452        let initiated_at_ms = SystemTime::now()
1453            .duration_since(UNIX_EPOCH)
1454            .map(|d| d.as_millis() as u64)
1455            .unwrap_or(0);
1456        
1457        // Get initial observer count before clearing
1458        let initial_count = self.observer_count();
1459        
1460        // Clear all observers
1461        self.clear_observers()?;
1462        
1463        // Wait for pending notifications with timeout
1464        let grace_period = timeout.min(std::time::Duration::from_millis(500));
1465        let completed_within_timeout = tokio::time::timeout(
1466            grace_period,
1467            tokio::time::sleep(grace_period)
1468        ).await.is_ok();
1469        
1470        let shutdown_duration = start.elapsed();
1471        
1472        Ok(ShutdownReport {
1473            observers_cleared: initial_count,
1474            shutdown_duration,
1475            completed_within_timeout,
1476            initiated_at_ms,
1477        })
1478    }
1479
1480    pub fn subscribe_with_token(&self, observer: Observer<T>) -> Result<Subscription<T>, PropertyError> {
1481        let id = self.subscribe(observer)?;
1482        Ok(Subscription {
1483            inner: Arc::clone(&self.inner),
1484            id
1485        })
1486    }
1487
1488    pub fn subscribe_filtered_with_token<F>(
1489        &self,
1490        observer: Observer<T>,
1491        filter: F,
1492    ) -> Result<Subscription<T>, PropertyError>
1493    where
1494        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1495    {
1496        let id = self.subscribe_filtered(observer, filter)?;
1497        Ok(Subscription {
1498            inner: Arc::clone(&self.inner),
1499            id
1500        })
1501    }
1502
1503    pub fn subscribe_async_with_token<F, Fut>(&self, handler: F) -> Result<Subscription<T>, PropertyError>
1504    where
1505        F: Fn(T, T) -> Fut + Send + Sync + 'static,
1506        Fut: std::future::Future<Output = ()> + Send + 'static,
1507    {
1508        let id = self.subscribe_async(handler)?;
1509        Ok(Subscription {
1510            inner: Arc::clone(&self.inner),
1511            id
1512        })
1513    }
1514
1515    pub fn subscribe_async_filtered_with_token<F, Fut, Filt>(
1516        &self,
1517        handler: F,
1518        filter: Filt,
1519    ) -> Result<Subscription<T>, PropertyError>
1520    where
1521        F: Fn(T, T) -> Fut + Send + Sync + 'static,
1522        Fut: std::future::Future<Output = ()> + Send + 'static,
1523        Filt: Fn(&T, &T) -> bool + Send + Sync + 'static,
1524    {
1525        let id = self.subscribe_async_filtered(handler, filter)?;
1526        Ok(Subscription {
1527            inner: Arc::clone(&self.inner),
1528            id
1529        })
1530    }
1531
1532}
1533
1534impl<T: Clone + Send + Sync + 'static + Default> Default for ObservableProperty<T> {
1535    /// Creates a new ObservableProperty with the default value for type T
1536    ///
1537    /// # Examples
1538    ///
1539    /// ```rust
1540    /// use observable_property_tokio::ObservableProperty;
1541    ///
1542    /// let property: ObservableProperty<i32> = Default::default();
1543    /// assert_eq!(property.get().unwrap(), 0); // Default for i32 is 0
1544    /// ```
1545    fn default() -> Self {
1546        Self::new(T::default())
1547    }
1548}
1549
1550impl<T: Clone> Clone for ObservableProperty<T> {
1551    /// Creates a new reference to the same observable property
1552    ///
1553    /// This creates a new `ObservableProperty` instance that shares the same
1554    /// underlying data with the original. Changes made through either instance
1555    /// will be visible to observers subscribed through both instances.
1556    ///
1557    /// # Examples
1558    ///
1559    /// ```rust
1560    /// use observable_property_tokio::ObservableProperty;
1561    /// use std::sync::Arc;
1562    ///
1563    /// #[tokio::main]
1564    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
1565    ///     let property1 = ObservableProperty::new(42);
1566    ///     let property2 = property1.clone();
1567    ///
1568    ///     property2.subscribe(Arc::new(|old, new| {
1569    ///         println!("Observer on property2 saw change: {} -> {}", old, new);
1570    ///     }))?;
1571    ///
1572    ///     // This change through property1 will trigger the observer on property2
1573    ///     property1.set(100)?;
1574    ///
1575    ///     Ok(())
1576    /// }
1577    /// ```
1578    fn clone(&self) -> Self {
1579        Self {
1580            inner: Arc::clone(&self.inner),
1581            config: self.config.clone(),
1582            async_task_semaphore: Arc::clone(&self.async_task_semaphore),
1583        }
1584    }
1585}
1586
1587impl<T: Clone + Send + Sync + 'static> Clone for PropertyHandle<T> {
1588    fn clone(&self) -> Self {
1589        Self {
1590            inner: Arc::clone(&self.inner),
1591        }
1592    }
1593}
1594
1595impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1596    /// Debug implementation that shows the current value if accessible
1597    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1598        match self.get() {
1599            Ok(value) => f.debug_struct("ObservableProperty")
1600                .field("value", &value)
1601                .field("observers_count", &self.observer_count())
1602                .finish(),
1603            Err(_) => f.debug_struct("ObservableProperty")
1604                .field("value", &"[inaccessible]")
1605                .field("observers_count", &self.observer_count())
1606                .finish(),
1607        }
1608    }
1609}
1610
1611#[cfg(test)]
1612mod tests {
1613    use super::*;
1614    use std::sync::atomic::{AtomicUsize, Ordering};
1615    use std::time::Duration;
1616    use tokio::time::sleep;
1617
1618    // Basic tests
1619    #[tokio::test]
1620    async fn test_new_and_get() -> Result<(), PropertyError> {
1621        let property = ObservableProperty::new(42);
1622        assert_eq!(property.get()?, 42);
1623        Ok(())
1624    }
1625
1626    #[tokio::test]
1627    async fn test_default() {
1628        let property: ObservableProperty<String> = Default::default();
1629        assert_eq!(property.get().unwrap(), String::default());
1630    }
1631
1632    #[tokio::test]
1633    async fn test_get_ref() {
1634        let property = ObservableProperty::new("hello".to_string());
1635        let value_ref = property.get_ref();
1636        assert_eq!(*value_ref, "hello");
1637    }
1638
1639    #[tokio::test]
1640    async fn test_set() -> Result<(), PropertyError> {
1641        let property = ObservableProperty::new(10);
1642        property.set(20)?;
1643        assert_eq!(property.get()?, 20);
1644        Ok(())
1645    }
1646
1647    #[tokio::test]
1648    async fn test_update() -> Result<(), PropertyError> {
1649        let property = ObservableProperty::new(10);
1650        property.update(|val| val * 2)?;
1651        assert_eq!(property.get()?, 20);
1652        Ok(())
1653    }
1654
1655    #[tokio::test]
1656    async fn test_update_async() -> Result<(), PropertyError> {
1657        let property = ObservableProperty::new(10);
1658        property.update_async(|val| val * 2).await?;
1659        assert_eq!(property.get()?, 20);
1660        Ok(())
1661    }
1662
1663    #[tokio::test]
1664    async fn test_map() -> Result<(), PropertyError> {
1665        let property = ObservableProperty::new(10);
1666        let derived = property.map(|val| val.to_string())?;
1667
1668        assert_eq!(derived.get()?, "10");
1669
1670        property.set(20)?;
1671        assert_eq!(derived.get()?, "20");
1672        Ok(())
1673    }
1674
1675    #[tokio::test]
1676    async fn test_set_async() -> Result<(), PropertyError> {
1677        let property = ObservableProperty::new("hello".to_string());
1678        property.set_async("world".to_string()).await?;
1679        assert_eq!(property.get()?, "world");
1680        Ok(())
1681    }
1682
1683    #[tokio::test]
1684    async fn test_clone() -> Result<(), PropertyError> {
1685        let property1 = ObservableProperty::new(100);
1686        let property2 = property1.clone();
1687
1688        // Change through property2
1689        property2.set(200)?;
1690
1691        // Both should reflect the change
1692        assert_eq!(property1.get()?, 200);
1693        assert_eq!(property2.get()?, 200);
1694        Ok(())
1695    }
1696
1697    // Observer tests
1698    #[tokio::test]
1699    async fn test_subscribe_and_notify() -> Result<(), PropertyError> {
1700        let property = ObservableProperty::new(0);
1701        let counter = Arc::new(AtomicUsize::new(0));
1702        let counter_clone = counter.clone();
1703
1704        property.subscribe(Arc::new(move |_, _| {
1705            counter_clone.fetch_add(1, Ordering::SeqCst);
1706        }))?;
1707
1708        property.set(1)?;
1709        property.set(2)?;
1710        property.set(3)?;
1711
1712        assert_eq!(counter.load(Ordering::SeqCst), 3);
1713        Ok(())
1714    }
1715
1716    #[tokio::test]
1717    async fn test_observer_count() -> Result<(), PropertyError> {
1718        let property = ObservableProperty::new(0);
1719        assert_eq!(property.observer_count(), 0);
1720
1721        let id1 = property.subscribe(Arc::new(|_, _| {}))?;
1722        let id2 = property.subscribe(Arc::new(|_, _| {}))?;
1723        assert_eq!(property.observer_count(), 2);
1724
1725        property.unsubscribe(id1)?;
1726        assert_eq!(property.observer_count(), 1);
1727
1728        property.unsubscribe(id2)?;
1729        assert_eq!(property.observer_count(), 0);
1730
1731        Ok(())
1732    }
1733
1734    #[tokio::test]
1735    async fn test_try_unsubscribe() -> Result<(), PropertyError> {
1736        let property = ObservableProperty::new(0);
1737        let id = property.subscribe(Arc::new(|_, _| {}))?;
1738
1739        assert!(property.try_unsubscribe(id));
1740        assert!(!property.try_unsubscribe(id));
1741
1742        Ok(())
1743    }
1744
1745    #[tokio::test]
1746    async fn test_subscribe_async() -> Result<(), PropertyError> {
1747        let property = ObservableProperty::new(0);
1748        let counter = Arc::new(AtomicUsize::new(0));
1749        let counter_clone = counter.clone();
1750
1751        property.subscribe_async(move |_, _| {
1752            let counter = counter_clone.clone();
1753            async move {
1754                // Simulate async work
1755                sleep(Duration::from_millis(10)).await;
1756                counter.fetch_add(1, Ordering::SeqCst);
1757            }
1758        })?;
1759
1760        property.set_async(1).await?;
1761        property.set_async(2).await?;
1762
1763        // Give time for async operations to complete
1764        sleep(Duration::from_millis(50)).await;
1765
1766        // Check counter after async operations complete
1767        assert_eq!(counter.load(Ordering::SeqCst), 2);
1768        Ok(())
1769    }
1770
1771    #[tokio::test]
1772    async fn test_subscribe_async_filtered() -> Result<(), PropertyError> {
1773        let property = ObservableProperty::new(0);
1774        let counter = Arc::new(AtomicUsize::new(0));
1775        let counter_clone = counter.clone();
1776
1777        property.subscribe_async_filtered(
1778            move |_, _| {
1779                let counter = counter_clone.clone();
1780                async move {
1781                    sleep(Duration::from_millis(10)).await;
1782                    counter.fetch_add(1, Ordering::SeqCst);
1783                }
1784            },
1785            |old, new| new > old
1786        )?;
1787
1788        property.set_async(10).await?; // Should trigger (0 -> 10)
1789        property.set_async(5).await?;  // Should NOT trigger (10 -> 5)
1790        property.set_async(15).await?; // Should trigger (5 -> 15)
1791
1792        // Give time for async operations to complete
1793        sleep(Duration::from_millis(50)).await;
1794
1795        // Only two updates should have triggered the observer
1796        assert_eq!(counter.load(Ordering::SeqCst), 2);
1797        Ok(())
1798    }
1799
1800    #[tokio::test]
1801    async fn test_multiple_observers() -> Result<(), PropertyError> {
1802        let property = ObservableProperty::new(0);
1803        let counter1 = Arc::new(AtomicUsize::new(0));
1804        let counter2 = Arc::new(AtomicUsize::new(0));
1805
1806        property.subscribe(Arc::new({
1807            let counter = counter1.clone();
1808            move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1809        }))?;
1810
1811        property.subscribe(Arc::new({
1812            let counter = counter2.clone();
1813            move |_, _| { counter.fetch_add(2, Ordering::SeqCst); }
1814        }))?;
1815
1816        property.set(42)?;
1817
1818        assert_eq!(counter1.load(Ordering::SeqCst), 1);
1819        assert_eq!(counter2.load(Ordering::SeqCst), 2);
1820        Ok(())
1821    }
1822
1823    #[tokio::test]
1824    async fn test_unsubscribe() -> Result<(), PropertyError> {
1825        let property = ObservableProperty::new(0);
1826        let counter = Arc::new(AtomicUsize::new(0));
1827
1828        let id = property.subscribe(Arc::new({
1829            let counter = counter.clone();
1830            move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1831        }))?;
1832
1833        property.set(1)?;
1834        assert_eq!(counter.load(Ordering::SeqCst), 1);
1835
1836        // Unsubscribe and verify it no longer receives updates
1837        property.unsubscribe(id)?;
1838
1839        // Set again, counter should not increase
1840        property.set(2)?;
1841        assert_eq!(counter.load(Ordering::SeqCst), 1);
1842
1843        // Try to unsubscribe again, should fail with ObserverNotFound
1844        match property.unsubscribe(id) {
1845            Err(PropertyError::ObserverNotFound { .. }) => {},
1846            other => panic!("Expected ObserverNotFound error, got {:?}", other),
1847        }
1848
1849        Ok(())
1850    }
1851
1852    // Filtered observer tests
1853    #[tokio::test]
1854    async fn test_filtered_observer() -> Result<(), PropertyError> {
1855        let property = ObservableProperty::new(0);
1856        let counter = Arc::new(AtomicUsize::new(0));
1857
1858        property.subscribe_filtered(
1859            Arc::new({
1860                let counter = counter.clone();
1861                move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1862            }),
1863            |old, new| new > old // Only trigger when value increases
1864        )?;
1865
1866        property.set(10)?; // Should trigger (0 -> 10)
1867        property.set(5)?;  // Should NOT trigger (10 -> 5)
1868        property.set(15)?; // Should trigger (5 -> 15)
1869
1870        assert_eq!(counter.load(Ordering::SeqCst), 2);
1871        Ok(())
1872    }
1873
1874    // Concurrent access tests
1875    #[tokio::test]
1876    async fn test_concurrent_modifications() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1877        let property = Arc::new(ObservableProperty::new(0));
1878        let final_counter = Arc::new(AtomicUsize::new(0));
1879
1880        // Subscribe to track the final value
1881        property.subscribe(Arc::new({
1882            let counter = final_counter.clone();
1883            move |_, new| {
1884                counter.store(*new, Ordering::SeqCst);
1885            }
1886        }))?;
1887
1888        // Create multiple tasks to update the property concurrently
1889        let mut tasks = vec![];
1890
1891        for i in 1..=5 {
1892            let prop = property.clone();
1893            let task = tokio::spawn(async move {
1894                prop.set(i).map_err(|e| format!("Failed to set property: {}", e))
1895            });
1896            tasks.push(task);
1897        }
1898
1899        // Wait for all tasks to complete
1900        for task in tasks {
1901            task.await??;
1902        }
1903
1904        // Final value should be one of the set values (1-5)
1905        let final_value = final_counter.load(Ordering::SeqCst);
1906        assert!(final_value >= 1 && final_value <= 5);
1907        Ok(())
1908    }
1909
1910    // Test for observer panic handling
1911    #[tokio::test]
1912    async fn test_observer_panic_handling() -> Result<(), PropertyError> {
1913        let property = ObservableProperty::new(0);
1914        let counter = Arc::new(AtomicUsize::new(0));
1915
1916        // First observer panics
1917        property.subscribe(Arc::new(|_, _| {
1918            panic!("This observer intentionally panics");
1919        }))?;
1920
1921        // Second observer should still run
1922        property.subscribe(Arc::new({
1923            let counter = counter.clone();
1924            move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1925        }))?;
1926
1927        // This should not panic the test
1928        property.set(42)?;
1929
1930        // Second observer should have run
1931        assert_eq!(counter.load(Ordering::SeqCst), 1);
1932        Ok(())
1933    }
1934
1935    // More tests for new functionality
1936    #[tokio::test]
1937    async fn test_async_observers_with_async_set() -> Result<(), PropertyError> {
1938        let property = ObservableProperty::new(0);
1939        let counter = Arc::new(AtomicUsize::new(0));
1940
1941        // Register two types of observers - one sync, one async
1942        property.subscribe(Arc::new({
1943            let counter = counter.clone();
1944            move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
1945        }))?;
1946
1947        let counter_clone = counter.clone();
1948        property.subscribe_async(move |_, _| {
1949            let counter = counter_clone.clone();
1950            async move {
1951                sleep(Duration::from_millis(10)).await;
1952                counter.fetch_add(1, Ordering::SeqCst);
1953            }
1954        })?;
1955
1956        // Using set_async should notify both observers
1957        property.set_async(42).await?;
1958
1959        // Give time for async observer to complete
1960        sleep(Duration::from_millis(50)).await;
1961
1962        // Both observers should have incremented the counter
1963        assert_eq!(counter.load(Ordering::SeqCst), 2);
1964        Ok(())
1965    }
1966
1967    // Test for stress with many observers
1968    #[tokio::test]
1969    async fn test_many_observers() -> Result<(), PropertyError> {
1970        let property = ObservableProperty::new(0);
1971        let counter = Arc::new(AtomicUsize::new(0));
1972
1973        // Add 100 observers
1974        for _ in 0..100 {
1975            property.subscribe(Arc::new({
1976                let counter = counter.clone();
1977                move |_, _| {
1978                    counter.fetch_add(1, Ordering::SeqCst);
1979                }
1980            }))?;
1981        }
1982
1983        // Trigger all observers
1984        property.set_async(999).await?;
1985
1986        // Wait for all to complete
1987        sleep(Duration::from_millis(100)).await;
1988
1989        // All 100 observers should have incremented the counter
1990        assert_eq!(counter.load(Ordering::SeqCst), 100);
1991        Ok(())
1992    }
1993
1994    // Test for correct old and new values in observers
1995    #[tokio::test]
1996    async fn test_observer_receives_correct_values() -> Result<(), PropertyError> {
1997        let property = ObservableProperty::new(100);
1998        let vals = Arc::new((AtomicUsize::new(0), AtomicUsize::new(0)));
1999
2000        property.subscribe(Arc::new({
2001            let vals = vals.clone();
2002            move |old, new| {
2003                vals.0.store(*old, Ordering::SeqCst);
2004                vals.1.store(*new, Ordering::SeqCst);
2005            }
2006        }))?;
2007
2008        property.set(200)?;
2009
2010        assert_eq!(vals.0.load(Ordering::SeqCst), 100);
2011        assert_eq!(vals.1.load(Ordering::SeqCst), 200);
2012        Ok(())
2013    }
2014
2015    // Test for complex data type
2016    #[derive(Debug, Clone, PartialEq)]
2017    struct Person {
2018        name: String,
2019        age: u32,
2020    }
2021
2022    #[tokio::test]
2023    async fn test_complex_data_type() -> Result<(), PropertyError> {
2024        let person1 = Person {
2025            name: "Alice".to_string(),
2026            age: 30,
2027        };
2028
2029        let person2 = Person {
2030            name: "Bob".to_string(),
2031            age: 25,
2032        };
2033
2034        let property = ObservableProperty::new(person1.clone());
2035        assert_eq!(property.get()?, person1);
2036
2037        let name_changes = Arc::new(AtomicUsize::new(0));
2038
2039        property.subscribe_filtered(
2040            Arc::new({
2041                let counter = name_changes.clone();
2042                move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2043            }),
2044            |old, new| old.name != new.name // Only notify on name changes
2045        )?;
2046
2047        // Update age only - shouldn't trigger
2048        let mut person3 = person1.clone();
2049        person3.age = 31;
2050        property.set(person3)?;
2051        assert_eq!(name_changes.load(Ordering::SeqCst), 0);
2052
2053        // Update name - should trigger
2054        property.set(person2)?;
2055        assert_eq!(name_changes.load(Ordering::SeqCst), 1);
2056        Ok(())
2057    }
2058
2059    // Test waiting for observers with proper async handling
2060    #[tokio::test]
2061    async fn test_waiting_for_observers() -> Result<(), PropertyError> {
2062        let property = ObservableProperty::new(0);
2063        let counter = Arc::new(AtomicUsize::new(0));
2064
2065        // Use subscribe_async instead of manually spawning tasks
2066        let counter_for_observer = counter.clone();
2067        property.subscribe_async(move |_, _| {
2068            let counter = counter_for_observer.clone();
2069            async move {
2070                sleep(Duration::from_millis(50)).await;
2071                counter.fetch_add(1, Ordering::SeqCst);
2072            }
2073        })?;
2074
2075        // Use the regular set_async method
2076        property.set_async(42).await?;
2077
2078        // Give sufficient time for async observers to complete
2079        sleep(Duration::from_millis(100)).await;
2080
2081        // Counter should be incremented after the async work completes
2082        assert_eq!(counter.load(Ordering::SeqCst), 1);
2083        Ok(())
2084    }
2085
2086    #[tokio::test]
2087    async fn test_subscription_auto_cleanup() -> Result<(), PropertyError> {
2088        let property = ObservableProperty::new(0);
2089        let counter = Arc::new(AtomicUsize::new(0));
2090
2091        {
2092            // Create a subscription in this scope
2093            let _subscription = property.subscribe_with_token(Arc::new({
2094                let counter = counter.clone();
2095                move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2096            }))?;
2097
2098            // Update should trigger the observer
2099            property.set(1)?;
2100            assert_eq!(counter.load(Ordering::SeqCst), 1);
2101
2102            // Subscription is still active within this scope
2103            property.set(2)?;
2104            assert_eq!(counter.load(Ordering::SeqCst), 2);
2105        } // _subscription is dropped here, should automatically unsubscribe
2106
2107        // After subscription is dropped, updates should not trigger the observer
2108        property.set(3)?;
2109        assert_eq!(counter.load(Ordering::SeqCst), 2); // Counter should not increment
2110
2111        Ok(())
2112    }
2113
2114    #[tokio::test]
2115    async fn test_filtered_subscription_auto_cleanup() -> Result<(), PropertyError> {
2116        let property = ObservableProperty::new(0);
2117        let counter = Arc::new(AtomicUsize::new(0));
2118
2119        // Only notify when value increases
2120        let filter = |old: &i32, new: &i32| new > old;
2121
2122        {
2123            // Create a filtered subscription in this scope
2124            let _subscription = property.subscribe_filtered_with_token(
2125                Arc::new({
2126                    let counter = counter.clone();
2127                    move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2128                }),
2129                filter
2130            )?;
2131
2132            property.set(10)?; // Should trigger (0 -> 10)
2133            assert_eq!(counter.load(Ordering::SeqCst), 1);
2134
2135            property.set(5)?; // Should NOT trigger (10 -> 5)
2136            assert_eq!(counter.load(Ordering::SeqCst), 1);
2137
2138            property.set(15)?; // Should trigger (5 -> 15)
2139            assert_eq!(counter.load(Ordering::SeqCst), 2);
2140        } // _subscription is dropped here, should automatically unsubscribe
2141
2142        // After subscription is dropped, updates should not trigger the observer
2143        property.set(20)?;
2144        assert_eq!(counter.load(Ordering::SeqCst), 2); // Counter should not increment
2145
2146        Ok(())
2147    }
2148
2149    #[tokio::test]
2150    async fn test_async_subscription_auto_cleanup() -> Result<(), PropertyError> {
2151        let property = ObservableProperty::new(0);
2152        let counter = Arc::new(AtomicUsize::new(0));
2153        let counter_clone = counter.clone(); // Clone before passing to closure
2154
2155        {
2156            // Create an async subscription in this scope
2157            let _subscription = property.subscribe_async_with_token(move |_, _| {
2158                let counter = counter_clone.clone(); // Use counter_clone instead of counter
2159                async move {
2160                    sleep(Duration::from_millis(10)).await;
2161                    counter.fetch_add(1, Ordering::SeqCst);
2162                }
2163            })?;
2164
2165            property.set_async(1).await?;
2166
2167            // Give time for async operations to complete
2168            sleep(Duration::from_millis(50)).await;
2169            assert_eq!(counter.load(Ordering::SeqCst), 1);
2170        } // _subscription is dropped here, should automatically unsubscribe
2171
2172        // After subscription is dropped, updates should not trigger the observer
2173        property.set_async(2).await?;
2174
2175        // Give time for any potential async operations to complete
2176        sleep(Duration::from_millis(50)).await;
2177        assert_eq!(counter.load(Ordering::SeqCst), 1); // Counter should not increment
2178
2179        Ok(())
2180    }
2181
2182    #[tokio::test]
2183    async fn test_async_filtered_subscription_auto_cleanup() -> Result<(), PropertyError> {
2184        let property = ObservableProperty::new(0);
2185        let counter = Arc::new(AtomicUsize::new(0));
2186        let counter_clone = counter.clone(); // Clone before passing to closure
2187
2188        {
2189            // Create an async filtered subscription in this scope
2190            let _subscription = property.subscribe_async_filtered_with_token(
2191                move |_, _| {
2192                    let counter = counter_clone.clone(); // Use counter_clone instead of counter
2193                    async move {
2194                        sleep(Duration::from_millis(10)).await;
2195                        counter.fetch_add(1, Ordering::SeqCst);
2196                    }
2197                },
2198                |old, new| new > old // Only trigger when value increases
2199            )?;
2200
2201            property.set_async(10).await?; // Should trigger (0 -> 10)
2202
2203            // Give time for async operations to complete
2204            sleep(Duration::from_millis(50)).await;
2205            assert_eq!(counter.load(Ordering::SeqCst), 1);
2206
2207            property.set_async(5).await?; // Should NOT trigger (10 -> 5)
2208            sleep(Duration::from_millis(50)).await;
2209            assert_eq!(counter.load(Ordering::SeqCst), 1);
2210        } // _subscription is dropped here, should automatically unsubscribe
2211
2212        // After subscription is dropped, updates should not trigger the observer
2213        property.set_async(15).await?; // Would have triggered with active subscription
2214
2215        // Give time for any potential async operations to complete
2216        sleep(Duration::from_millis(50)).await;
2217        assert_eq!(counter.load(Ordering::SeqCst), 1); // Counter should not increment
2218
2219        Ok(())
2220    }
2221
2222    #[tokio::test]
2223    async fn test_multiple_subscriptions() -> Result<(), PropertyError> {
2224        let property = ObservableProperty::new(0);
2225        let counter1 = Arc::new(AtomicUsize::new(0));
2226        let counter2 = Arc::new(AtomicUsize::new(0));
2227
2228        // First subscription
2229        let subscription1 = property.subscribe_with_token(Arc::new({
2230            let counter = counter1.clone();
2231            move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2232        }))?;
2233
2234        // Second subscription
2235        let subscription2 = property.subscribe_with_token(Arc::new({
2236            let counter = counter2.clone();
2237            move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2238        }))?;
2239
2240        // Both subscriptions should receive updates
2241        property.set(1)?;
2242        assert_eq!(counter1.load(Ordering::SeqCst), 1);
2243        assert_eq!(counter2.load(Ordering::SeqCst), 1);
2244
2245        // Drop first subscription only
2246        drop(subscription1);
2247
2248        // Only the second subscription should receive updates now
2249        property.set(2)?;
2250        assert_eq!(counter1.load(Ordering::SeqCst), 1); // Should not increment
2251        assert_eq!(counter2.load(Ordering::SeqCst), 2); // Should increment
2252
2253        // Drop second subscription
2254        drop(subscription2);
2255
2256        // No subscriptions should receive updates now
2257        property.set(3)?;
2258        assert_eq!(counter1.load(Ordering::SeqCst), 1);
2259        assert_eq!(counter2.load(Ordering::SeqCst), 2);
2260
2261        Ok(())
2262    }
2263
2264    #[tokio::test]
2265    async fn test_subscription_with_property_drop() -> Result<(), PropertyError> {
2266        // Create property in a scope so it can be dropped
2267        let counter = Arc::new(AtomicUsize::new(0));
2268        let subscription;
2269
2270        {
2271            let property = ObservableProperty::new(0);
2272
2273            // Create subscription
2274            subscription = property.subscribe_with_token(Arc::new({
2275                let counter = counter.clone();
2276                move |_, _| { counter.fetch_add(1, Ordering::SeqCst); }
2277            }))?;
2278
2279            // Subscription works normally
2280            property.set(1)?;
2281            assert_eq!(counter.load(Ordering::SeqCst), 1);
2282        } // property is dropped here, but subscription is still alive
2283
2284        // Subscription should be aware that property is gone when we drop it
2285        // This should not panic or cause any issues
2286        drop(subscription);
2287
2288        Ok(())
2289    }
2290
2291    // Test cleanup methods
2292    #[tokio::test]
2293    async fn test_cleanup_methods() -> Result<(), PropertyError> {
2294        let property = ObservableProperty::new(42);
2295        let counter = Arc::new(AtomicUsize::new(0));
2296
2297        // Subscribe multiple observers
2298        let counter1 = counter.clone();
2299        property.subscribe(Arc::new(move |_, _| {
2300            counter1.fetch_add(1, Ordering::SeqCst);
2301        }))?;
2302
2303        let counter2 = counter.clone();
2304        property.subscribe_async(move |_, _| {
2305            let counter = counter2.clone();
2306            async move {
2307                counter.fetch_add(1, Ordering::SeqCst);
2308            }
2309        })?;
2310
2311        assert_eq!(property.observer_count(), 2);
2312
2313        // Test clear_observers
2314        property.clear_observers()?;
2315        assert_eq!(property.observer_count(), 0);
2316
2317        // Setting value should not trigger any observers
2318        property.set(100)?;
2319        assert_eq!(counter.load(Ordering::SeqCst), 0);
2320
2321        // Re-subscribe to test shutdown
2322        let counter3 = counter.clone();
2323        property.subscribe(Arc::new(move |_, _| {
2324            counter3.fetch_add(1, Ordering::SeqCst);
2325        }))?;
2326
2327        assert_eq!(property.observer_count(), 1);
2328
2329        // Test shutdown method
2330        property.shutdown()?;
2331        assert_eq!(property.observer_count(), 0);
2332
2333        // Setting value should not trigger any observers after shutdown
2334        property.set(200)?;
2335        assert_eq!(counter.load(Ordering::SeqCst), 0);
2336
2337        Ok(())
2338    }
2339
2340    // Test backpressure and configuration functionality
2341    #[tokio::test]
2342    async fn test_property_config_default() {
2343        let config = PropertyConfig::default();
2344        assert_eq!(config.max_observers, 1000);
2345        assert_eq!(config.max_pending_notifications, 100);
2346        assert_eq!(config.observer_timeout_ms, 5000);
2347    }
2348
2349    #[tokio::test]
2350    async fn test_property_with_custom_config() -> Result<(), PropertyError> {
2351        let config = PropertyConfig {
2352            max_observers: 5,
2353            max_pending_notifications: 10,
2354            observer_timeout_ms: 1000,
2355            max_concurrent_async_tasks: 100,
2356        };
2357
2358        let property = ObservableProperty::new_with_config(42, config);
2359        assert_eq!(property.get()?, 42);
2360
2361        // Should be able to add up to max_observers
2362        for i in 0..5 {
2363            property.subscribe(Arc::new(move |_, _| {
2364                println!("Observer {}", i);
2365            }))?;
2366        }
2367
2368        assert_eq!(property.observer_count(), 5);
2369        Ok(())
2370    }
2371
2372    #[tokio::test]
2373    async fn test_observer_capacity_limit() -> Result<(), PropertyError> {
2374        let config = PropertyConfig {
2375            max_observers: 3,
2376            max_pending_notifications: 100,
2377            observer_timeout_ms: 5000,
2378            max_concurrent_async_tasks: 100,
2379        };
2380
2381        let property = ObservableProperty::new_with_config(0, config);
2382
2383        // Add observers up to limit
2384        property.subscribe(Arc::new(|_, _| {}))?;
2385        property.subscribe(Arc::new(|_, _| {}))?;
2386        property.subscribe(Arc::new(|_, _| {}))?;
2387
2388        assert_eq!(property.observer_count(), 3);
2389
2390        // Next subscribe should fail with CapacityExceeded
2391        let result = property.subscribe(Arc::new(|_, _| {}));
2392        assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2393
2394        if let Err(PropertyError::CapacityExceeded { current, max, resource }) = result {
2395            assert_eq!(current, 3);
2396            assert_eq!(max, 3);
2397            assert_eq!(resource, "observers");
2398        }
2399
2400        Ok(())
2401    }
2402
2403    #[tokio::test]
2404    async fn test_observer_capacity_after_unsubscribe() -> Result<(), PropertyError> {
2405        let config = PropertyConfig {
2406            max_observers: 2,
2407            max_pending_notifications: 100,
2408            observer_timeout_ms: 5000,
2409            max_concurrent_async_tasks: 100,
2410        };
2411
2412        let property = ObservableProperty::new_with_config(0, config);
2413
2414        // Add observers up to limit
2415        let id1 = property.subscribe(Arc::new(|_, _| {}))?;
2416        let _id2 = property.subscribe(Arc::new(|_, _| {}))?;
2417
2418        assert_eq!(property.observer_count(), 2);
2419
2420        // Next subscribe should fail
2421        assert!(property.subscribe(Arc::new(|_, _| {})).is_err());
2422
2423        // Unsubscribe one observer
2424        property.unsubscribe(id1)?;
2425        assert_eq!(property.observer_count(), 1);
2426
2427        // Now we should be able to add another observer
2428        let _id3 = property.subscribe(Arc::new(|_, _| {}))?;
2429        assert_eq!(property.observer_count(), 2);
2430
2431        // But not beyond the limit
2432        assert!(property.subscribe(Arc::new(|_, _| {})).is_err());
2433
2434        Ok(())
2435    }
2436
2437    #[tokio::test]
2438    async fn test_async_observer_capacity_limit() -> Result<(), PropertyError> {
2439        let config = PropertyConfig {
2440            max_observers: 2,
2441            max_pending_notifications: 100,
2442            observer_timeout_ms: 5000,
2443            max_concurrent_async_tasks: 100,
2444        };
2445
2446        let property = ObservableProperty::new_with_config(0, config);
2447
2448        // Add async observers up to limit
2449        property.subscribe_async(|_, _| async move {
2450            sleep(Duration::from_millis(10)).await;
2451        })?;
2452
2453        property.subscribe_async(|_, _| async move {
2454            sleep(Duration::from_millis(10)).await;
2455        })?;
2456
2457        assert_eq!(property.observer_count(), 2);
2458
2459        // Next subscribe should fail
2460        let result = property.subscribe_async(|_, _| async move {});
2461        assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2462
2463        Ok(())
2464    }
2465
2466    #[tokio::test]
2467    async fn test_filtered_observer_capacity_limit() -> Result<(), PropertyError> {
2468        let config = PropertyConfig {
2469            max_observers: 2,
2470            max_pending_notifications: 100,
2471            observer_timeout_ms: 5000,
2472            max_concurrent_async_tasks: 100,
2473        };
2474
2475        let property = ObservableProperty::new_with_config(0, config);
2476
2477        // Add filtered observers up to limit
2478        property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true)?;
2479        property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true)?;
2480
2481        assert_eq!(property.observer_count(), 2);
2482
2483        // Next subscribe should fail
2484        let result = property.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true);
2485        assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2486
2487        Ok(())
2488    }
2489
2490    #[tokio::test]
2491    async fn test_capacity_error_diagnostic() {
2492        let error = PropertyError::CapacityExceeded {
2493            current: 100,
2494            max: 100,
2495            resource: "observers".to_string(),
2496        };
2497
2498        let diagnostic = error.diagnostic_info();
2499        assert!(diagnostic.contains("CAPACITY_EXCEEDED"));
2500        assert!(diagnostic.contains("resource=observers"));
2501        assert!(diagnostic.contains("current=100"));
2502        assert!(diagnostic.contains("max=100"));
2503        assert!(diagnostic.contains("utilization=100.0%"));
2504    }
2505
2506    #[tokio::test]
2507    async fn test_cloned_property_shares_config() -> Result<(), PropertyError> {
2508        let config = PropertyConfig {
2509            max_observers: 3,
2510            max_pending_notifications: 100,
2511            observer_timeout_ms: 5000,
2512            max_concurrent_async_tasks: 100,
2513        };
2514
2515        let property1 = ObservableProperty::new_with_config(0, config);
2516        let property2 = property1.clone();
2517
2518        // Add observers through both properties
2519        property1.subscribe(Arc::new(|_, _| {}))?;
2520        property2.subscribe(Arc::new(|_, _| {}))?;
2521        property1.subscribe(Arc::new(|_, _| {}))?;
2522
2523        // Both should show 3 observers since they share the same inner state
2524        assert_eq!(property1.observer_count(), 3);
2525        assert_eq!(property2.observer_count(), 3);
2526
2527        // Next subscribe should fail on either property
2528        assert!(property1.subscribe(Arc::new(|_, _| {})).is_err());
2529        assert!(property2.subscribe(Arc::new(|_, _| {})).is_err());
2530
2531        Ok(())
2532    }
2533
2534    #[tokio::test]
2535    async fn test_subscription_token_with_capacity() -> Result<(), PropertyError> {
2536        let config = PropertyConfig {
2537            max_observers: 2,
2538            max_pending_notifications: 100,
2539            observer_timeout_ms: 5000,
2540            max_concurrent_async_tasks: 100,
2541        };
2542
2543        let property = ObservableProperty::new_with_config(0, config);
2544
2545        // Create subscriptions with tokens
2546        let _sub1 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
2547        let _sub2 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
2548
2549        assert_eq!(property.observer_count(), 2);
2550
2551        // Next subscribe should fail
2552        let result = property.subscribe_with_token(Arc::new(|_, _| {}));
2553        assert!(matches!(result, Err(PropertyError::CapacityExceeded { .. })));
2554
2555        // Drop one subscription
2556        drop(_sub1);
2557
2558        // Now we should be able to add another
2559        let _sub3 = property.subscribe_with_token(Arc::new(|_, _| {}))?;
2560        assert_eq!(property.observer_count(), 2);
2561
2562        Ok(())
2563    }
2564
2565    // Test error diagnostic functionality
2566    #[tokio::test]
2567    async fn test_error_diagnostic_info() {
2568        // Test ReadLockError
2569        let read_error = PropertyError::read_lock_error("get_value", "acquiring read lock failed");
2570        let diagnostic = read_error.diagnostic_info();
2571        assert!(diagnostic.contains("READ_LOCK_ERROR"));
2572        assert!(diagnostic.contains("operation=get_value"));
2573        assert!(diagnostic.contains("context=acquiring read lock failed"));
2574        assert!(diagnostic.contains("timestamp_ms="));
2575
2576        // Test WriteLockError
2577        let write_error = PropertyError::write_lock_error("set_value", "acquiring write lock failed");
2578        let diagnostic = write_error.diagnostic_info();
2579        assert!(diagnostic.contains("WRITE_LOCK_ERROR"));
2580        assert!(diagnostic.contains("operation=set_value"));
2581
2582        // Test LockPoisoned
2583        let poisoned_error = PropertyError::lock_poisoned("notify", "inner lock poisoned");
2584        let diagnostic = poisoned_error.diagnostic_info();
2585        assert!(diagnostic.contains("LOCK_POISONED"));
2586        assert!(diagnostic.contains("operation=notify"));
2587        assert!(diagnostic.contains("context=inner lock poisoned"));
2588
2589        // Test ObserverPanic
2590        let panic_error = PropertyError::observer_panic(ObserverId(42), "observer crashed");
2591        let diagnostic = panic_error.diagnostic_info();
2592        assert!(diagnostic.contains("OBSERVER_PANIC"));
2593        assert!(diagnostic.contains("observer_id=42"));
2594        assert!(diagnostic.contains("error=observer crashed"));
2595
2596        // Test ObserverNotFound
2597        let not_found_error = PropertyError::ObserverNotFound { id: ObserverId(99) };
2598        let diagnostic = not_found_error.diagnostic_info();
2599        assert!(diagnostic.contains("OBSERVER_NOT_FOUND"));
2600        assert!(diagnostic.contains("id=99"));
2601
2602        // Test CapacityExceeded
2603        let capacity_error = PropertyError::CapacityExceeded {
2604            current: 150,
2605            max: 100,
2606            resource: "observers".to_string(),
2607        };
2608        let diagnostic = capacity_error.diagnostic_info();
2609        assert!(diagnostic.contains("CAPACITY_EXCEEDED"));
2610        assert!(diagnostic.contains("resource=observers"));
2611        assert!(diagnostic.contains("current=150"));
2612        assert!(diagnostic.contains("max=100"));
2613        assert!(diagnostic.contains("utilization=150.0%"));
2614
2615        // Test OperationTimeout
2616        let timeout_error = PropertyError::OperationTimeout {
2617            operation: "notify_all".to_string(),
2618            elapsed_ms: 5500,
2619            threshold_ms: 5000,
2620        };
2621        let diagnostic = timeout_error.diagnostic_info();
2622        assert!(diagnostic.contains("OPERATION_TIMEOUT"));
2623        assert!(diagnostic.contains("operation=notify_all"));
2624        assert!(diagnostic.contains("elapsed_ms=5500"));
2625        assert!(diagnostic.contains("threshold_ms=5000"));
2626        assert!(diagnostic.contains("overage_ms=500"));
2627
2628        // Test ShutdownInProgress
2629        let shutdown_error = PropertyError::ShutdownInProgress;
2630        let diagnostic = shutdown_error.diagnostic_info();
2631        assert!(diagnostic.contains("SHUTDOWN_IN_PROGRESS"));
2632
2633        // Test ObserverError
2634        let observer_error = PropertyError::ObserverError {
2635            reason: "callback failed".to_string(),
2636        };
2637        let diagnostic = observer_error.diagnostic_info();
2638        assert!(diagnostic.contains("OBSERVER_ERROR"));
2639        assert!(diagnostic.contains("reason=callback failed"));
2640
2641        // Test TokioError
2642        let tokio_error = PropertyError::TokioError {
2643            reason: "runtime unavailable".to_string(),
2644        };
2645        let diagnostic = tokio_error.diagnostic_info();
2646        assert!(diagnostic.contains("TOKIO_ERROR"));
2647        assert!(diagnostic.contains("reason=runtime unavailable"));
2648
2649        // Test JoinError
2650        let join_error = PropertyError::JoinError("task panicked".to_string());
2651        let diagnostic = join_error.diagnostic_info();
2652        assert!(diagnostic.contains("JOIN_ERROR"));
2653        assert!(diagnostic.contains("message=task panicked"));
2654    }
2655
2656    #[tokio::test]
2657    async fn test_error_helper_functions_with_timestamp() {
2658        use std::time::{SystemTime, UNIX_EPOCH};
2659        
2660        let before = SystemTime::now()
2661            .duration_since(UNIX_EPOCH)
2662            .unwrap()
2663            .as_millis() as u64;
2664
2665        // Create errors using helper functions
2666        let read_error = PropertyError::read_lock_error("test_op", "test_context");
2667        let write_error = PropertyError::write_lock_error("test_op", "test_context");
2668        let poisoned_error = PropertyError::lock_poisoned("test_op", "test_context");
2669        let panic_error = PropertyError::observer_panic(ObserverId(1), "test_panic");
2670
2671        let after = SystemTime::now()
2672            .duration_since(UNIX_EPOCH)
2673            .unwrap()
2674            .as_millis() as u64;
2675
2676        // Verify timestamps are reasonable (within the time window of test execution)
2677        match read_error {
2678            PropertyError::ReadLockError { timestamp_ms, .. } => {
2679                assert!(timestamp_ms >= before && timestamp_ms <= after);
2680            }
2681            _ => panic!("Expected ReadLockError"),
2682        }
2683
2684        match write_error {
2685            PropertyError::WriteLockError { timestamp_ms, .. } => {
2686                assert!(timestamp_ms >= before && timestamp_ms <= after);
2687            }
2688            _ => panic!("Expected WriteLockError"),
2689        }
2690
2691        match poisoned_error {
2692            PropertyError::LockPoisoned { timestamp_ms, .. } => {
2693                assert!(timestamp_ms >= before && timestamp_ms <= after);
2694            }
2695            _ => panic!("Expected LockPoisoned"),
2696        }
2697
2698        match panic_error {
2699            PropertyError::ObserverPanic { timestamp_ms, .. } => {
2700                assert!(timestamp_ms >= before && timestamp_ms <= after);
2701            }
2702            _ => panic!("Expected ObserverPanic"),
2703        }
2704    }
2705
2706    #[tokio::test]
2707    async fn test_error_display_formatting() {
2708        let timeout_error = PropertyError::OperationTimeout {
2709            operation: "test_operation".to_string(),
2710            elapsed_ms: 1500,
2711            threshold_ms: 1000,
2712        };
2713        let display = format!("{}", timeout_error);
2714        assert!(display.contains("test_operation"));
2715        assert!(display.contains("1500ms"));
2716        assert!(display.contains("1000ms"));
2717
2718        let capacity_error = PropertyError::CapacityExceeded {
2719            current: 200,
2720            max: 100,
2721            resource: "test_resource".to_string(),
2722        };
2723        let display = format!("{}", capacity_error);
2724        assert!(display.contains("200"));
2725        assert!(display.contains("100"));
2726        assert!(display.contains("test_resource"));
2727    }
2728
2729    #[tokio::test]
2730    async fn test_capacity_exceeded_utilization_calculation() {
2731        let error = PropertyError::CapacityExceeded {
2732            current: 75,
2733            max: 100,
2734            resource: "observers".to_string(),
2735        };
2736        let diagnostic = error.diagnostic_info();
2737        assert!(diagnostic.contains("utilization=75.0%"));
2738
2739        let error2 = PropertyError::CapacityExceeded {
2740            current: 100,
2741            max: 100,
2742            resource: "observers".to_string(),
2743        };
2744        let diagnostic2 = error2.diagnostic_info();
2745        assert!(diagnostic2.contains("utilization=100.0%"));
2746    }
2747
2748    // Test graceful shutdown functionality
2749    #[tokio::test]
2750    async fn test_shutdown_with_timeout_basic() -> Result<(), PropertyError> {
2751        use std::time::Duration;
2752        
2753        let property = ObservableProperty::new(0);
2754        let counter = Arc::new(AtomicUsize::new(0));
2755        
2756        // Add some observers
2757        let counter1 = counter.clone();
2758        property.subscribe(Arc::new(move |_, _| {
2759            counter1.fetch_add(1, Ordering::SeqCst);
2760        }))?;
2761        
2762        let counter2 = counter.clone();
2763        property.subscribe_async(move |_, _| {
2764            let counter = counter2.clone();
2765            async move {
2766                counter.fetch_add(1, Ordering::SeqCst);
2767            }
2768        })?;
2769        
2770        assert_eq!(property.observer_count(), 2);
2771        
2772        // Perform shutdown with timeout
2773        let report = property.shutdown_with_timeout(Duration::from_secs(5)).await?;
2774        
2775        // Verify report
2776        assert_eq!(report.observers_cleared, 2);
2777        assert!(report.shutdown_duration.as_secs() < 5);
2778        assert!(report.completed_within_timeout);
2779        assert!(report.initiated_at_ms > 0);
2780        
2781        // Verify observers were cleared
2782        assert_eq!(property.observer_count(), 0);
2783        
2784        // Setting value should not trigger observers
2785        property.set(42)?;
2786        assert_eq!(counter.load(Ordering::SeqCst), 0);
2787        
2788        Ok(())
2789    }
2790
2791    #[tokio::test]
2792    async fn test_shutdown_report_diagnostic() -> Result<(), PropertyError> {
2793        use std::time::Duration;
2794        
2795        let property = ObservableProperty::new(100);
2796        
2797        // Add multiple observers
2798        for _ in 0..5 {
2799            property.subscribe(Arc::new(|_, _| {}))?;
2800        }
2801        
2802        let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2803        
2804        let diagnostic = report.diagnostic_info();
2805        assert!(diagnostic.contains("SHUTDOWN_COMPLETE"));
2806        assert!(diagnostic.contains("observers_cleared=5"));
2807        assert!(diagnostic.contains("within_timeout=true"));
2808        assert!(diagnostic.contains("initiated_at_ms="));
2809        
2810        Ok(())
2811    }
2812
2813    #[tokio::test]
2814    async fn test_shutdown_with_async_observers() -> Result<(), PropertyError> {
2815        use std::time::Duration;
2816        
2817        let property = ObservableProperty::new(0);
2818        let counter = Arc::new(AtomicUsize::new(0));
2819        
2820        // Add async observers that take some time
2821        let counter1 = counter.clone();
2822        property.subscribe_async(move |_, _| {
2823            let counter = counter1.clone();
2824            async move {
2825                tokio::time::sleep(Duration::from_millis(50)).await;
2826                counter.fetch_add(1, Ordering::SeqCst);
2827            }
2828        })?;
2829        
2830        let counter2 = counter.clone();
2831        property.subscribe_async(move |_, _| {
2832            let counter = counter2.clone();
2833            async move {
2834                tokio::time::sleep(Duration::from_millis(50)).await;
2835                counter.fetch_add(1, Ordering::SeqCst);
2836            }
2837        })?;
2838        
2839        // Trigger observers
2840        property.set_async(42).await?;
2841        
2842        // Give time for async operations to start
2843        tokio::time::sleep(Duration::from_millis(10)).await;
2844        
2845        // Shutdown with enough timeout for operations to complete
2846        let report = property.shutdown_with_timeout(Duration::from_secs(2)).await?;
2847        
2848        assert_eq!(report.observers_cleared, 2);
2849        assert!(report.completed_within_timeout);
2850        
2851        Ok(())
2852    }
2853
2854    #[tokio::test]
2855    async fn test_shutdown_idempotent() -> Result<(), PropertyError> {
2856        use std::time::Duration;
2857        
2858        let property = ObservableProperty::new("test");
2859        
2860        property.subscribe(Arc::new(|_, _| {}))?;
2861        property.subscribe(Arc::new(|_, _| {}))?;
2862        
2863        // First shutdown
2864        let report1 = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2865        assert_eq!(report1.observers_cleared, 2);
2866        
2867        // Second shutdown should still succeed but clear 0 observers
2868        let report2 = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2869        assert_eq!(report2.observers_cleared, 0);
2870        
2871        Ok(())
2872    }
2873
2874    #[tokio::test]
2875    async fn test_shutdown_vs_shutdown_with_timeout() -> Result<(), PropertyError> {
2876        use std::time::Duration;
2877        
2878        // Test regular shutdown
2879        let property1 = ObservableProperty::new(0);
2880        property1.subscribe(Arc::new(|_, _| {}))?;
2881        property1.subscribe(Arc::new(|_, _| {}))?;
2882        
2883        property1.shutdown()?;
2884        assert_eq!(property1.observer_count(), 0);
2885        
2886        // Test shutdown with timeout
2887        let property2 = ObservableProperty::new(0);
2888        property2.subscribe(Arc::new(|_, _| {}))?;
2889        property2.subscribe(Arc::new(|_, _| {}))?;
2890        
2891        let report = property2.shutdown_with_timeout(Duration::from_secs(1)).await?;
2892        assert_eq!(property2.observer_count(), 0);
2893        assert_eq!(report.observers_cleared, 2);
2894        
2895        Ok(())
2896    }
2897
2898    #[tokio::test]
2899    async fn test_shutdown_report_timing() -> Result<(), PropertyError> {
2900        use std::time::{Duration, Instant};
2901        
2902        let property = ObservableProperty::new(0);
2903        
2904        // Add several observers
2905        for _ in 0..10 {
2906            property.subscribe(Arc::new(|_, _| {}))?;
2907        }
2908        
2909        let start = Instant::now();
2910        let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2911        let elapsed = start.elapsed();
2912        
2913        // Shutdown should complete reasonably quickly
2914        assert!(elapsed < Duration::from_secs(2));
2915        assert!(report.shutdown_duration <= elapsed);
2916        assert_eq!(report.observers_cleared, 10);
2917        
2918        Ok(())
2919    }
2920
2921    #[tokio::test]
2922    async fn test_shutdown_with_filtered_and_async_observers() -> Result<(), PropertyError> {
2923        use std::time::Duration;
2924        
2925        let property = ObservableProperty::new(0);
2926        
2927        // Mix of different observer types
2928        property.subscribe(Arc::new(|_, _| {}))?;
2929        property.subscribe_async(|_, _| async {})?;
2930        property.subscribe_filtered(Arc::new(|_, _| {}), |_, new| new % 2 == 0)?;
2931        property.subscribe_async_filtered(|_, _| async {}, |_, new| new > &0)?;
2932        
2933        assert_eq!(property.observer_count(), 4);
2934        
2935        let report = property.shutdown_with_timeout(Duration::from_secs(1)).await?;
2936        
2937        assert_eq!(report.observers_cleared, 4);
2938        assert_eq!(property.observer_count(), 0);
2939        
2940        Ok(())
2941    }
2942
2943    // Batching tests
2944    #[tokio::test]
2945    async fn test_batched_property_creation() -> Result<(), PropertyError> {
2946        let property = BatchedProperty::new(42);
2947        assert_eq!(property.get()?, 42);
2948        
2949        let config = BatchConfig {
2950            batch_interval: std::time::Duration::from_millis(50),
2951        };
2952        let property2 = BatchedProperty::new_with_config(100, config);
2953        assert_eq!(property2.get()?, 100);
2954        
2955        Ok(())
2956    }
2957
2958    #[tokio::test]
2959    async fn test_batched_property_queue_update() -> Result<(), PropertyError> {
2960        use std::time::Duration;
2961        
2962        let property = BatchedProperty::new(0);
2963        let counter = Arc::new(AtomicUsize::new(0));
2964        let last_value = Arc::new(RwLock::new(0));
2965        
2966        // Subscribe to updates
2967        property.subscribe(Arc::new({
2968            let counter = counter.clone();
2969            let last_value = last_value.clone();
2970            move |_, new| {
2971                counter.fetch_add(1, Ordering::SeqCst);
2972                *last_value.write() = *new;
2973            }
2974        }))?;
2975        
2976        // Queue multiple updates rapidly
2977        for i in 1..=10 {
2978            property.queue_update(i)?;
2979        }
2980        
2981        // Wait for batch to flush (default is 100ms)
2982        tokio::time::sleep(Duration::from_millis(150)).await;
2983        
2984        // Should have been notified only once with the last value
2985        assert_eq!(counter.load(Ordering::SeqCst), 1);
2986        assert_eq!(*last_value.read(), 10);
2987        assert_eq!(property.get()?, 10);
2988        
2989        Ok(())
2990    }
2991
2992    #[tokio::test]
2993    async fn test_batched_property_multiple_batches() -> Result<(), PropertyError> {
2994        use std::time::Duration;
2995        
2996        let config = BatchConfig {
2997            batch_interval: Duration::from_millis(50),
2998        };
2999        let property = BatchedProperty::new_with_config(0, config);
3000        let counter = Arc::new(AtomicUsize::new(0));
3001        
3002        property.subscribe(Arc::new({
3003            let counter = counter.clone();
3004            move |_, _| {
3005                counter.fetch_add(1, Ordering::SeqCst);
3006            }
3007        }))?;
3008        
3009        // First batch
3010        for i in 1..=5 {
3011            property.queue_update(i)?;
3012        }
3013        tokio::time::sleep(Duration::from_millis(75)).await;
3014        
3015        // Second batch
3016        for i in 6..=10 {
3017            property.queue_update(i)?;
3018        }
3019        tokio::time::sleep(Duration::from_millis(75)).await;
3020        
3021        // Should have been notified twice (once per batch)
3022        assert_eq!(counter.load(Ordering::SeqCst), 2);
3023        assert_eq!(property.get()?, 10);
3024        
3025        Ok(())
3026    }
3027
3028    #[tokio::test]
3029    async fn test_batched_property_set_immediate() -> Result<(), PropertyError> {
3030        use std::time::Duration;
3031        
3032        let property = BatchedProperty::new(0);
3033        let counter = Arc::new(AtomicUsize::new(0));
3034        
3035        property.subscribe(Arc::new({
3036            let counter = counter.clone();
3037            move |_, _| {
3038                counter.fetch_add(1, Ordering::SeqCst);
3039            }
3040        }))?;
3041        
3042        // Queue some updates
3043        property.queue_update(5)?;
3044        property.queue_update(10)?;
3045        
3046        // Set immediately - should notify right away
3047        property.set_immediate(42)?;
3048        assert_eq!(counter.load(Ordering::SeqCst), 1);
3049        assert_eq!(property.get()?, 42);
3050        
3051        // Wait for batch - queued updates should be cleared by set_immediate
3052        tokio::time::sleep(Duration::from_millis(150)).await;
3053        
3054        // Counter should still be 1 (no additional notification from batch)
3055        // Note: This behavior depends on timing, but set_immediate should have priority
3056        assert!(counter.load(Ordering::SeqCst) >= 1);
3057        
3058        Ok(())
3059    }
3060
3061    #[tokio::test]
3062    async fn test_batched_property_flush() -> Result<(), PropertyError> {
3063        let property = BatchedProperty::new(0);
3064        let counter = Arc::new(AtomicUsize::new(0));
3065        
3066        property.subscribe(Arc::new({
3067            let counter = counter.clone();
3068            move |_, _| {
3069                counter.fetch_add(1, Ordering::SeqCst);
3070            }
3071        }))?;
3072        
3073        // Queue updates
3074        property.queue_update(42)?;
3075        
3076        // Flush immediately
3077        property.flush().await?;
3078        
3079        // Should be notified immediately
3080        assert_eq!(counter.load(Ordering::SeqCst), 1);
3081        assert_eq!(property.get()?, 42);
3082        
3083        Ok(())
3084    }
3085
3086    #[tokio::test]
3087    async fn test_batched_property_async_observer() -> Result<(), PropertyError> {
3088        use std::time::Duration;
3089        
3090        let property = BatchedProperty::new(0);
3091        let counter = Arc::new(AtomicUsize::new(0));
3092        
3093        property.subscribe_async({
3094            let counter = counter.clone();
3095            move |_, _| {
3096                let counter = counter.clone();
3097                async move {
3098                    tokio::time::sleep(Duration::from_millis(10)).await;
3099                    counter.fetch_add(1, Ordering::SeqCst);
3100                }
3101            }
3102        })?;
3103        
3104        // Queue multiple updates
3105        for i in 1..=5 {
3106            property.queue_update(i)?;
3107        }
3108        
3109        // Wait for batch and async processing
3110        tokio::time::sleep(Duration::from_millis(200)).await;
3111        
3112        // Should have been notified once
3113        assert_eq!(counter.load(Ordering::SeqCst), 1);
3114        
3115        Ok(())
3116    }
3117
3118    #[tokio::test]
3119    async fn test_batched_property_observer_management() -> Result<(), PropertyError> {
3120        let property = BatchedProperty::new(0);
3121        
3122        // Add observers
3123        let id1 = property.subscribe(Arc::new(|_, _| {}))?;
3124        let _id2 = property.subscribe(Arc::new(|_, _| {}))?;
3125        assert_eq!(property.observer_count(), 2);
3126        
3127        // Unsubscribe one
3128        property.unsubscribe(id1)?;
3129        assert_eq!(property.observer_count(), 1);
3130        
3131        // Clear all
3132        property.clear_observers()?;
3133        assert_eq!(property.observer_count(), 0);
3134        
3135        Ok(())
3136    }
3137
3138    #[tokio::test]
3139    async fn test_batched_property_clone() -> Result<(), PropertyError> {
3140        use std::time::Duration;
3141        
3142        let property1 = BatchedProperty::new(0);
3143        let property2 = property1.clone();
3144        
3145        let counter = Arc::new(AtomicUsize::new(0));
3146        
3147        // Subscribe on clone
3148        property2.subscribe(Arc::new({
3149            let counter = counter.clone();
3150            move |_, _| {
3151                counter.fetch_add(1, Ordering::SeqCst);
3152            }
3153        }))?;
3154        
3155        // Update on original
3156        property1.queue_update(42)?;
3157        
3158        // Wait for batch
3159        tokio::time::sleep(Duration::from_millis(150)).await;
3160        
3161        // Observer should be notified
3162        assert_eq!(counter.load(Ordering::SeqCst), 1);
3163        assert_eq!(property1.get()?, 42);
3164        assert_eq!(property2.get()?, 42);
3165        
3166        Ok(())
3167    }
3168
3169    #[tokio::test]
3170    async fn test_batched_property_high_frequency() -> Result<(), PropertyError> {
3171        use std::time::Duration;
3172        
3173        let config = BatchConfig {
3174            batch_interval: Duration::from_millis(100),
3175        };
3176        let property = BatchedProperty::new_with_config(0, config);
3177        let counter = Arc::new(AtomicUsize::new(0));
3178        
3179        property.subscribe(Arc::new({
3180            let counter = counter.clone();
3181            move |_, _| {
3182                counter.fetch_add(1, Ordering::SeqCst);
3183            }
3184        }))?;
3185        
3186        // Queue 1000 updates rapidly
3187        for i in 1..=1000 {
3188            property.queue_update(i)?;
3189        }
3190        
3191        // Wait for batch
3192        tokio::time::sleep(Duration::from_millis(150)).await;
3193        
3194        // Should have been notified only once despite 1000 updates
3195        assert_eq!(counter.load(Ordering::SeqCst), 1);
3196        assert_eq!(property.get()?, 1000);
3197        
3198        Ok(())
3199    }
3200}
3201
3202/// Configuration for batched property updates
3203///
3204/// Controls how frequently batched updates are flushed to observers,
3205/// helping reduce overhead for high-frequency property changes.
3206///
3207/// # Examples
3208///
3209/// ```
3210/// use observable_property_tokio::BatchConfig;
3211/// use std::time::Duration;
3212///
3213/// let config = BatchConfig {
3214///     batch_interval: Duration::from_millis(100),
3215/// };
3216/// ```
3217#[derive(Debug, Clone)]
3218pub struct BatchConfig {
3219    /// How often to flush batched updates to observers
3220    ///
3221    /// A longer interval reduces notification overhead but increases latency.
3222    /// A shorter interval provides faster updates but with more overhead.
3223    ///
3224    /// Default: 100ms
3225    pub batch_interval: std::time::Duration,
3226}
3227
3228impl Default for BatchConfig {
3229    fn default() -> Self {
3230        Self {
3231            batch_interval: std::time::Duration::from_millis(100),
3232        }
3233    }
3234}
3235
3236/// A batched wrapper around ObservableProperty that reduces notification overhead
3237///
3238/// This type collects property updates and only notifies observers at regular intervals,
3239/// which is useful for high-frequency update scenarios where you want to reduce the
3240/// number of observer notifications.
3241///
3242/// # Examples
3243///
3244/// ```
3245/// use observable_property_tokio::{BatchedProperty, BatchConfig};
3246/// use std::time::Duration;
3247/// use std::sync::Arc;
3248///
3249/// #[tokio::main]
3250/// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3251///     // Create a batched property with 100ms batching interval
3252///     let config = BatchConfig {
3253///         batch_interval: Duration::from_millis(100),
3254///     };
3255///     
3256///     let property = BatchedProperty::new_with_config(0, config);
3257///     
3258///     // Subscribe to batched updates
3259///     property.subscribe(Arc::new(|old, new| {
3260///         println!("Batched update: {} -> {}", old, new);
3261///     }))?;
3262///     
3263///     // Queue multiple updates rapidly
3264///     for i in 1..=100 {
3265///         property.queue_update(i)?;
3266///     }
3267///     
3268///     // Observers will only be notified once per batch interval
3269///     // with the latest value
3270///     
3271///     // Wait for batch to flush
3272///     tokio::time::sleep(Duration::from_millis(150)).await;
3273///     
3274///     Ok(())
3275/// }
3276/// ```
3277pub struct BatchedProperty<T: Clone + Send + Sync + 'static> {
3278    inner: ObservableProperty<T>,
3279    pending_update: Arc<RwLock<Option<T>>>,
3280    _batch_task: Arc<tokio::task::JoinHandle<()>>,
3281}
3282
3283impl<T: Clone + Send + Sync + 'static> BatchedProperty<T> {
3284    /// Create a new batched property with default configuration
3285    ///
3286    /// Uses a batch interval of 100ms by default.
3287    ///
3288    /// # Arguments
3289    ///
3290    /// * `initial_value` - The starting value for the property
3291    ///
3292    /// # Examples
3293    ///
3294    /// ```
3295    /// use observable_property_tokio::BatchedProperty;
3296    ///
3297    /// #[tokio::main]
3298    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3299    ///     let property = BatchedProperty::new(42);
3300    ///     assert_eq!(property.get()?, 42);
3301    ///     Ok(())
3302    /// }
3303    /// ```
3304    pub fn new(initial_value: T) -> Self {
3305        Self::new_with_config(initial_value, BatchConfig::default())
3306    }
3307
3308    /// Create a new batched property with custom configuration
3309    ///
3310    /// # Arguments
3311    ///
3312    /// * `initial_value` - The starting value for the property
3313    /// * `config` - Batch configuration controlling flush interval
3314    ///
3315    /// # Examples
3316    ///
3317    /// ```
3318    /// use observable_property_tokio::{BatchedProperty, BatchConfig};
3319    /// use std::time::Duration;
3320    ///
3321    /// #[tokio::main]
3322    /// async fn main() {
3323    ///     let config = BatchConfig {
3324    ///         batch_interval: Duration::from_millis(50),
3325    ///     };
3326    ///
3327    ///     let property = BatchedProperty::new_with_config(0, config);
3328    /// }
3329    /// ```
3330    pub fn new_with_config(initial_value: T, config: BatchConfig) -> Self {
3331        let inner = ObservableProperty::new(initial_value);
3332        let pending_update = Arc::new(RwLock::new(None));
3333
3334        // Spawn batch processor task
3335        let inner_clone = inner.clone();
3336        let pending_clone = pending_update.clone();
3337        let batch_interval = config.batch_interval;
3338
3339        let batch_task = tokio::spawn(async move {
3340            let mut interval = tokio::time::interval(batch_interval);
3341            loop {
3342                interval.tick().await;
3343                
3344                // Check if there's a pending update
3345                let update = {
3346                    let mut pending = pending_clone.write();
3347                    pending.take()
3348                };
3349
3350                // If there's an update, apply it
3351                if let Some(value) = update {
3352                    let _ = inner_clone.set_async(value).await;
3353                }
3354            }
3355        });
3356
3357        Self {
3358            inner,
3359            pending_update,
3360            _batch_task: Arc::new(batch_task),
3361        }
3362    }
3363
3364    /// Queue an update to be batched
3365    ///
3366    /// The update will be held until the next batch interval, at which point
3367    /// only the most recent queued value will be applied and observers notified.
3368    ///
3369    /// # Arguments
3370    ///
3371    /// * `value` - The new value to queue
3372    ///
3373    /// # Returns
3374    ///
3375    /// `Ok(())` if the update was queued successfully
3376    ///
3377    /// # Examples
3378    ///
3379    /// ```
3380    /// use observable_property_tokio::BatchedProperty;
3381    /// use std::time::Duration;
3382    ///
3383    /// #[tokio::main]
3384    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3385    ///     let property = BatchedProperty::new(0);
3386    ///     
3387    ///     // Queue multiple updates
3388    ///     for i in 1..=10 {
3389    ///         property.queue_update(i)?;
3390    ///     }
3391    ///     
3392    ///     // Wait for batch to flush
3393    ///     tokio::time::sleep(Duration::from_millis(150)).await;
3394    ///     
3395    ///     // Property will have the last queued value
3396    ///     assert_eq!(property.get()?, 10);
3397    ///     
3398    ///     Ok(())
3399    /// }
3400    /// ```
3401    pub fn queue_update(&self, value: T) -> Result<(), PropertyError> {
3402        *self.pending_update.write() = Some(value);
3403        Ok(())
3404    }
3405
3406    /// Set the value immediately, bypassing batching
3407    ///
3408    /// This will apply the update immediately and notify observers synchronously,
3409    /// without waiting for the next batch interval.
3410    ///
3411    /// # Arguments
3412    ///
3413    /// * `value` - The new value to set
3414    ///
3415    /// # Examples
3416    ///
3417    /// ```
3418    /// use observable_property_tokio::BatchedProperty;
3419    ///
3420    /// # #[tokio::main]
3421    /// # async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3422    /// let property = BatchedProperty::new(0);
3423    /// property.set_immediate(42)?;
3424    /// assert_eq!(property.get()?, 42);
3425    /// # Ok(())
3426    /// # }
3427    /// ```
3428    pub fn set_immediate(&self, value: T) -> Result<(), PropertyError> {
3429        self.inner.set(value)
3430    }
3431
3432    /// Set the value immediately using async notification
3433    ///
3434    /// This will apply the update immediately and notify observers asynchronously,
3435    /// without waiting for the next batch interval.
3436    ///
3437    /// # Arguments
3438    ///
3439    /// * `value` - The new value to set
3440    pub async fn set_immediate_async(&self, value: T) -> Result<(), PropertyError> {
3441        self.inner.set_async(value).await
3442    }
3443
3444    /// Get the current value
3445    ///
3446    /// # Examples
3447    ///
3448    /// ```
3449    /// use observable_property_tokio::BatchedProperty;
3450    ///
3451    /// #[tokio::main]
3452    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3453    ///     let property = BatchedProperty::new(42);
3454    ///     assert_eq!(property.get()?, 42);
3455    ///     Ok(())
3456    /// }
3457    /// ```
3458    pub fn get(&self) -> Result<T, PropertyError> {
3459        self.inner.get()
3460    }
3461
3462    /// Subscribe to batched property changes
3463    ///
3464    /// Observers will be notified at batch intervals with the latest value.
3465    ///
3466    /// # Arguments
3467    ///
3468    /// * `observer` - Callback function to handle property changes
3469    ///
3470    /// # Returns
3471    ///
3472    /// `Ok(ObserverId)` containing a unique identifier for this observer
3473    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
3474        self.inner.subscribe(observer)
3475    }
3476
3477    /// Subscribe with an async handler
3478    pub fn subscribe_async<F, Fut>(&self, handler: F) -> Result<ObserverId, PropertyError>
3479    where
3480        F: Fn(T, T) -> Fut + Send + Sync + 'static,
3481        Fut: std::future::Future<Output = ()> + Send + 'static,
3482    {
3483        self.inner.subscribe_async(handler)
3484    }
3485
3486    /// Unsubscribe an observer
3487    pub fn unsubscribe(&self, id: ObserverId) -> Result<(), PropertyError> {
3488        self.inner.unsubscribe(id)
3489    }
3490
3491    /// Get the number of registered observers
3492    pub fn observer_count(&self) -> usize {
3493        self.inner.observer_count()
3494    }
3495
3496    /// Clear all observers
3497    pub fn clear_observers(&self) -> Result<(), PropertyError> {
3498        self.inner.clear_observers()
3499    }
3500
3501    /// Flush any pending batched update immediately
3502    ///
3503    /// This forces any queued update to be applied right away,
3504    /// rather than waiting for the next batch interval.
3505    ///
3506    /// # Examples
3507    ///
3508    /// ```
3509    /// use observable_property_tokio::BatchedProperty;
3510    ///
3511    /// # #[tokio::main]
3512    /// # async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3513    /// let property = BatchedProperty::new(0);
3514    /// property.queue_update(42)?;
3515    /// property.flush().await?;
3516    /// assert_eq!(property.get()?, 42);
3517    /// # Ok(())
3518    /// # }
3519    /// ```
3520    pub async fn flush(&self) -> Result<(), PropertyError> {
3521        let update = {
3522            let mut pending = self.pending_update.write();
3523            pending.take()
3524        };
3525
3526        if let Some(value) = update {
3527            self.inner.set_async(value).await?;
3528        }
3529
3530        Ok(())
3531    }
3532}
3533
3534impl<T: Clone + Send + Sync + 'static> Clone for BatchedProperty<T> {
3535    fn clone(&self) -> Self {
3536        Self {
3537            inner: self.inner.clone(),
3538            pending_update: Arc::clone(&self.pending_update),
3539            _batch_task: Arc::clone(&self._batch_task),
3540        }
3541    }
3542}
3543
3544impl From<JoinError> for PropertyError {
3545    /// Convert a Tokio JoinError into a PropertyError
3546    ///
3547    /// This enables using the `?` operator directly on `task::spawn(...).await`
3548    /// which returns a `Result<T, JoinError>`.
3549    ///
3550    /// # Examples
3551    ///
3552    /// ```rust
3553    /// use observable_property_tokio::ObservableProperty;
3554    /// use std::sync::Arc;
3555    /// use tokio::task;
3556    ///
3557    /// #[tokio::main]
3558    /// async fn main() -> Result<(), observable_property_tokio::PropertyError> {
3559    ///     let property = Arc::new(ObservableProperty::new(0));
3560    ///     let property_clone = property.clone();
3561    ///
3562    ///     // This task::spawn can now use ?? to propagate both types of errors
3563    ///     task::spawn(async move {
3564    ///         property_clone.set(42)?;
3565    ///         Ok::<_, observable_property_tokio::PropertyError>(())
3566    ///     }).await??;
3567    ///
3568    ///     Ok(())
3569    /// }
3570    /// ```
3571    fn from(err: JoinError) -> Self {
3572        PropertyError::JoinError(err.to_string())
3573    }
3574}
3575
3576#[cfg(test)]
3577mod connection_pool_tests {
3578    use super::*;
3579    use std::sync::atomic::{AtomicUsize, Ordering};
3580    use tokio::time::{sleep, Duration};
3581
3582    #[tokio::test]
3583    async fn test_concurrent_task_limiting() -> Result<(), PropertyError> {
3584        let config = PropertyConfig {
3585            max_observers: 1000,
3586            max_pending_notifications: 1000,
3587            observer_timeout_ms: 5000,
3588            max_concurrent_async_tasks: 5, // Only 5 concurrent tasks allowed
3589        };
3590
3591        let property = ObservableProperty::new_with_config(0, config);
3592        let concurrent_count = Arc::new(AtomicUsize::new(0));
3593        let max_concurrent = Arc::new(AtomicUsize::new(0));
3594
3595        // Subscribe 20 async observers that all take 100ms to execute
3596        for _ in 0..20 {
3597            let counter = Arc::clone(&concurrent_count);
3598            let max_counter = Arc::clone(&max_concurrent);
3599
3600            property.subscribe_async(move |_, _| {
3601                let counter = Arc::clone(&counter);
3602                let max_counter = Arc::clone(&max_counter);
3603
3604                async move {
3605                    // Increment concurrent count
3606                    let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
3607
3608                    // Update max if needed
3609                    max_counter.fetch_max(current, Ordering::SeqCst);
3610
3611                    // Simulate work
3612                    sleep(Duration::from_millis(100)).await;
3613
3614                    // Decrement concurrent count
3615                    counter.fetch_sub(1, Ordering::SeqCst);
3616                }
3617            })?;
3618        }
3619
3620        // Trigger notification to all observers
3621        property.set_async(42).await?;
3622
3623        // Wait for all tasks to complete
3624        sleep(Duration::from_millis(500)).await;
3625
3626        // Verify max concurrent was not exceeded
3627        let max_reached = max_concurrent.load(Ordering::SeqCst);
3628        println!(
3629            "Max concurrent tasks: {} (limit: 5)",
3630            max_reached
3631        );
3632
3633        assert!(
3634            max_reached <= 5,
3635            "Expected max concurrent tasks <= 5, but got {}",
3636            max_reached
3637        );
3638
3639        Ok(())
3640    }
3641
3642    #[tokio::test]
3643    async fn test_semaphore_blocks_when_max_reached() -> Result<(), PropertyError> {
3644        let config = PropertyConfig {
3645            max_observers: 100,
3646            max_pending_notifications: 100,
3647            observer_timeout_ms: 5000,
3648            max_concurrent_async_tasks: 2, // Very low limit
3649        };
3650
3651        let property = ObservableProperty::new_with_config(0, config);
3652        let execution_order = Arc::new(parking_lot::RwLock::new(Vec::new()));
3653
3654        // Add 5 observers with delays to test blocking
3655        for i in 0..5 {
3656            let order = Arc::clone(&execution_order);
3657            property.subscribe_async(move |_, _| {
3658                let order = Arc::clone(&order);
3659                async move {
3660                    order.write().push((i, "start"));
3661                    sleep(Duration::from_millis(50)).await;
3662                    order.write().push((i, "end"));
3663                }
3664            })?;
3665        }
3666
3667        // Trigger notification
3668        property.set_async(100).await?;
3669
3670        // Wait for all to complete
3671        sleep(Duration::from_millis(300)).await;
3672
3673        let order = execution_order.read();
3674        println!("Execution order: {:?}", *order);
3675
3676        // Verify that we have start/end pairs
3677        assert_eq!(order.len(), 10, "Should have 5 start and 5 end events");
3678
3679        // Verify all tasks completed
3680        let starts = order.iter().filter(|(_, phase)| *phase == "start").count();
3681        let ends = order.iter().filter(|(_, phase)| *phase == "end").count();
3682        assert_eq!(starts, 5, "Should have 5 starts");
3683        assert_eq!(ends, 5, "Should have 5 ends");
3684
3685        Ok(())
3686    }
3687
3688    #[tokio::test]
3689    async fn test_permits_released_after_execution() -> Result<(), PropertyError> {
3690        let config = PropertyConfig {
3691            max_observers: 100,
3692            max_pending_notifications: 100,
3693            observer_timeout_ms: 5000,
3694            max_concurrent_async_tasks: 3,
3695        };
3696
3697        let property = ObservableProperty::new_with_config(0, config);
3698        let executions = Arc::new(AtomicUsize::new(0));
3699
3700        // Add 10 observers
3701        for _ in 0..10 {
3702            let counter = Arc::clone(&executions);
3703            property.subscribe_async(move |_, _| {
3704                let counter = Arc::clone(&counter);
3705                async move {
3706                    counter.fetch_add(1, Ordering::SeqCst);
3707                    sleep(Duration::from_millis(10)).await;
3708                }
3709            })?;
3710        }
3711
3712        // Multiple notifications to test permit reuse
3713        for _ in 0..3 {
3714            property.set_async(42).await?;
3715            sleep(Duration::from_millis(100)).await;
3716        }
3717
3718        // Verify all executions happened (10 observers * 3 notifications = 30)
3719        let total = executions.load(Ordering::SeqCst);
3720        assert_eq!(total, 30, "Expected 30 executions, got {}", total);
3721
3722        Ok(())
3723    }
3724
3725    #[tokio::test]
3726    async fn test_filtered_async_observers_respect_limit() -> Result<(), PropertyError> {
3727        let config = PropertyConfig {
3728            max_observers: 100,
3729            max_pending_notifications: 100,
3730            observer_timeout_ms: 5000,
3731            max_concurrent_async_tasks: 3,
3732        };
3733
3734        let property = ObservableProperty::new_with_config(0, config);
3735        let concurrent_count = Arc::new(AtomicUsize::new(0));
3736        let max_concurrent = Arc::new(AtomicUsize::new(0));
3737
3738        // Add 10 filtered async observers
3739        for _ in 0..10 {
3740            let counter = Arc::clone(&concurrent_count);
3741            let max_counter = Arc::clone(&max_concurrent);
3742
3743            property.subscribe_async_filtered(
3744                move |_, _| {
3745                    let counter = Arc::clone(&counter);
3746                    let max_counter = Arc::clone(&max_counter);
3747
3748                    async move {
3749                        let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
3750                        max_counter.fetch_max(current, Ordering::SeqCst);
3751                        sleep(Duration::from_millis(50)).await;
3752                        counter.fetch_sub(1, Ordering::SeqCst);
3753                    }
3754                },
3755                |_, &new| new % 2 == 0, // Only trigger on even values
3756            )?;
3757        }
3758
3759        // Trigger with even value
3760        property.set_async(100).await?;
3761        sleep(Duration::from_millis(200)).await;
3762
3763        let max_reached = max_concurrent.load(Ordering::SeqCst);
3764        println!("Max concurrent filtered async tasks: {}", max_reached);
3765
3766        assert!(
3767            max_reached <= 3,
3768            "Expected max concurrent tasks <= 3, got {}",
3769            max_reached
3770        );
3771
3772        Ok(())
3773    }
3774
3775    #[tokio::test]
3776    async fn test_default_concurrent_limit() -> Result<(), PropertyError> {
3777        // Default config should have max_concurrent_async_tasks = 100
3778        let property = ObservableProperty::new(0);
3779
3780        // Add 200 async observers
3781        for _ in 0..200 {
3782            property.subscribe_async(|_, _| async move {
3783                sleep(Duration::from_millis(50)).await;
3784            })?;
3785        }
3786
3787        // This should work without blocking indefinitely
3788        property.set_async(42).await?;
3789        sleep(Duration::from_millis(300)).await;
3790
3791        Ok(())
3792    }
3793
3794    #[tokio::test]
3795    async fn test_mixed_sync_and_async_observers() -> Result<(), PropertyError> {
3796        let config = PropertyConfig {
3797            max_observers: 100,
3798            max_pending_notifications: 100,
3799            observer_timeout_ms: 5000,
3800            max_concurrent_async_tasks: 2,
3801        };
3802
3803        let property = ObservableProperty::new_with_config(0, config);
3804        let sync_count = Arc::new(AtomicUsize::new(0));
3805        let async_count = Arc::new(AtomicUsize::new(0));
3806
3807        // Add sync observers (these are not limited by semaphore)
3808        for _ in 0..5 {
3809            let counter = Arc::clone(&sync_count);
3810            property.subscribe(Arc::new(move |_, _| {
3811                counter.fetch_add(1, Ordering::SeqCst);
3812            }))?;
3813        }
3814
3815        // Add async observers (these ARE limited by semaphore)
3816        for _ in 0..5 {
3817            let counter = Arc::clone(&async_count);
3818            property.subscribe_async(move |_, _| {
3819                let counter = Arc::clone(&counter);
3820                async move {
3821                    sleep(Duration::from_millis(20)).await;
3822                    counter.fetch_add(1, Ordering::SeqCst);
3823                }
3824            })?;
3825        }
3826
3827        property.set_async(100).await?;
3828        sleep(Duration::from_millis(150)).await;
3829
3830        // All sync observers should have executed immediately
3831        assert_eq!(sync_count.load(Ordering::SeqCst), 5);
3832
3833        // All async observers should have executed (even if limited)
3834        assert_eq!(async_count.load(Ordering::SeqCst), 5);
3835
3836        Ok(())
3837    }
3838}