observable_property/
lib.rs

1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
15//!
16//! ## Quick Start
17//!
18//! ```rust
19//! use observable_property::ObservableProperty;
20//! use std::sync::Arc;
21//!
22//! // Create an observable property
23//! let property = ObservableProperty::new(42);
24//!
25//! // Subscribe to changes
26//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
27//!     println!("Value changed from {} to {}", old_value, new_value);
28//! })).map_err(|e| {
29//!     eprintln!("Failed to subscribe: {}", e);
30//!     e
31//! })?;
32//!
33//! // Change the value (triggers observer)
34//! property.set(100).map_err(|e| {
35//!     eprintln!("Failed to set value: {}", e);
36//!     e
37//! })?;
38//!
39//! // Unsubscribe when done
40//! property.unsubscribe(observer_id).map_err(|e| {
41//!     eprintln!("Failed to unsubscribe: {}", e);
42//!     e
43//! })?;
44//! # Ok::<(), observable_property::PropertyError>(())
45//! ```
46//!
47//! ## Multi-threading Example
48//!
49//! ```rust
50//! use observable_property::ObservableProperty;
51//! use std::sync::Arc;
52//! use std::thread;
53//!
54//! let property = Arc::new(ObservableProperty::new(0));
55//! let property_clone = property.clone();
56//!
57//! // Subscribe from one thread
58//! property.subscribe(Arc::new(|old, new| {
59//!     println!("Value changed: {} -> {}", old, new);
60//! })).map_err(|e| {
61//!     eprintln!("Failed to subscribe: {}", e);
62//!     e
63//! })?;
64//!
65//! // Modify from another thread
66//! thread::spawn(move || {
67//!     if let Err(e) = property_clone.set(42) {
68//!         eprintln!("Failed to set value: {}", e);
69//!     }
70//! }).join().expect("Thread panicked");
71//! # Ok::<(), observable_property::PropertyError>(())
72//! ```
73//!
74//! ## RAII Subscriptions (Recommended)
75//!
76//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
77//!
78//! ```rust
79//! use observable_property::ObservableProperty;
80//! use std::sync::Arc;
81//!
82//! # fn main() -> Result<(), observable_property::PropertyError> {
83//! let property = ObservableProperty::new(0);
84//!
85//! {
86//!     // Create RAII subscription - automatically cleaned up when dropped
87//!     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
88//!         println!("Value changed: {} -> {}", old, new);
89//!     }))?;
90//!
91//!     property.set(42)?; // Prints: "Value changed: 0 -> 42"
92//!
93//!     // Subscription automatically unsubscribes when leaving this scope
94//! }
95//!
96//! // No observer active anymore
97//! property.set(100)?; // No output
98//! # Ok(())
99//! # }
100//! ```
101//!
102//! ## Filtered RAII Subscriptions
103//!
104//! Combine filtering with automatic cleanup for conditional monitoring:
105//!
106//! ```rust
107//! use observable_property::ObservableProperty;
108//! use std::sync::Arc;
109//!
110//! # fn main() -> Result<(), observable_property::PropertyError> {
111//! let temperature = ObservableProperty::new(20.0);
112//!
113//! {
114//!     // Monitor only significant temperature increases with automatic cleanup
115//!     let _heat_warning = temperature.subscribe_filtered_with_subscription(
116//!         Arc::new(|old, new| {
117//!             println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
118//!         }),
119//!         |old, new| new > old && (new - old) > 5.0
120//!     )?;
121//!
122//!     temperature.set(22.0)?; // No warning (only 2°C increase)
123//!     temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
124//!
125//!     // Subscription automatically cleaned up here
126//! }
127//!
128//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
129//! # Ok(())
130//! # }
131//! ```
132//!
133//! ## Subscription Management Comparison
134//!
135//! ```rust
136//! use observable_property::ObservableProperty;
137//! use std::sync::Arc;
138//!
139//! # fn main() -> Result<(), observable_property::PropertyError> {
140//! let property = ObservableProperty::new(0);
141//! let observer = Arc::new(|old: &i32, new: &i32| {
142//!     println!("Value: {} -> {}", old, new);
143//! });
144//!
145//! // Method 1: Manual subscription management
146//! let observer_id = property.subscribe(observer.clone())?;
147//! property.set(42)?;
148//! property.unsubscribe(observer_id)?; // Manual cleanup required
149//!
150//! // Method 2: RAII subscription management (recommended)
151//! {
152//!     let _subscription = property.subscribe_with_subscription(observer)?;
153//!     property.set(100)?;
154//!     // Automatic cleanup when _subscription goes out of scope
155//! }
156//! # Ok(())
157//! # }
158//! ```
159//!
160//! ## Advanced RAII Patterns
161//!
162//! Comprehensive example showing various RAII subscription patterns:
163//!
164//! ```rust
165//! use observable_property::ObservableProperty;
166//! use std::sync::Arc;
167//!
168//! # fn main() -> Result<(), observable_property::PropertyError> {
169//! // System monitoring example
170//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
171//! let memory_usage = ObservableProperty::new(1024); // MB
172//! let active_connections = ObservableProperty::new(0u32);
173//!
174//! // Conditional monitoring based on system state
175//! let high_load_monitoring = cpu_usage.get()? > 50.0;
176//!
177//! if high_load_monitoring {
178//!     // Critical system monitoring - active only during high load
179//!     let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
180//!         Arc::new(|old, new| {
181//!             println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
182//!         }),
183//!         |_, new| *new > 80.0
184//!     )?;
185//!
186//!     let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
187//!         Arc::new(|old, new| {
188//!             println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
189//!         }),
190//!         |_, new| *new > 8192 // > 8GB
191//!     )?;
192//!
193//!     // Simulate system load changes
194//!     cpu_usage.set(85.0)?;     // Would trigger critical alert
195//!     memory_usage.set(9216)?;  // Would trigger memory warning
196//!     
197//!     // All monitoring automatically stops when exiting this block
198//! }
199//!
200//! // Connection monitoring with scoped lifetime
201//! {
202//!     let _connection_monitor = active_connections.subscribe_with_subscription(
203//!         Arc::new(|old, new| {
204//!             if new > old {
205//!                 println!("📈 New connections: {} -> {}", old, new);
206//!             } else if new < old {
207//!                 println!("📉 Connections closed: {} -> {}", old, new);
208//!             }
209//!         })
210//!     )?;
211//!
212//!     active_connections.set(5)?;  // Prints: "📈 New connections: 0 -> 5"
213//!     active_connections.set(3)?;  // Prints: "📉 Connections closed: 5 -> 3"
214//!     active_connections.set(8)?;  // Prints: "📈 New connections: 3 -> 8"
215//!
216//!     // Connection monitoring automatically stops here
217//! }
218//!
219//! // No monitoring active anymore
220//! cpu_usage.set(95.0)?;         // No output
221//! memory_usage.set(10240)?;     // No output  
222//! active_connections.set(15)?;  // No output
223//!
224//! println!("All monitoring automatically cleaned up!");
225//! # Ok(())
226//! # }
227//! ```
228
229use std::collections::HashMap;
230use std::fmt;
231use std::panic;
232use std::sync::{Arc, RwLock};
233use std::thread;
234
235/// Maximum number of background threads used for asynchronous observer notifications
236///
237/// This constant controls the degree of parallelism when using `set_async()` to notify
238/// observers. The observer list is divided into batches, with each batch running in
239/// its own background thread, up to this maximum number of threads.
240///
241/// # Rationale
242///
243/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
244/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead  
245/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
246/// - **System Responsiveness**: Limits thread contention on multi-core systems
247///
248/// # Implementation Details
249///
250/// When `set_async()` is called:
251/// 1. All observers are collected into a snapshot
252/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
253/// 3. Each batch executes in its own `thread::spawn()` call
254/// 4. Observers within each batch are executed sequentially
255///
256/// For example, with 100 observers and `MAX_THREADS = 4`:
257/// - Batch 1: Observers 1-25 (Thread 1)
258/// - Batch 2: Observers 26-50 (Thread 2)  
259/// - Batch 3: Observers 51-75 (Thread 3)
260/// - Batch 4: Observers 76-100 (Thread 4)
261///
262/// # Tuning Considerations
263///
264/// This value can be adjusted based on your application's needs:
265/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
266/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
267/// - **Memory-constrained systems**: Lower values reduce thread overhead
268/// - **Real-time systems**: Lower values reduce scheduling unpredictability
269///
270/// # Thread Safety
271///
272/// This constant is used only during the batching calculation and does not affect
273/// the thread safety of the overall system.
274const MAX_THREADS: usize = 4;
275/// Errors that can occur when working with ObservableProperty
276#[derive(Debug, Clone)]
277pub enum PropertyError {
278    /// Failed to acquire a read lock on the property
279    ReadLockError {
280        /// Context describing what operation was being attempted
281        context: String,
282    },
283    /// Failed to acquire a write lock on the property  
284    WriteLockError {
285        /// Context describing what operation was being attempted
286        context: String,
287    },
288    /// Attempted to unsubscribe an observer that doesn't exist
289    ObserverNotFound {
290        /// The ID of the observer that wasn't found
291        id: usize,
292    },
293    /// The property's lock has been poisoned due to a panic in another thread
294    PoisonedLock,
295    /// An observer function encountered an error during execution
296    ObserverError {
297        /// Description of what went wrong
298        reason: String,
299    },
300    /// The thread pool for async notifications is exhausted
301    ThreadPoolExhausted,
302    /// Invalid configuration was provided
303    InvalidConfiguration {
304        /// Description of the invalid configuration
305        reason: String,
306    },
307}
308
309impl fmt::Display for PropertyError {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        match self {
312            PropertyError::ReadLockError { context } => {
313                write!(f, "Failed to acquire read lock: {}", context)
314            }
315            PropertyError::WriteLockError { context } => {
316                write!(f, "Failed to acquire write lock: {}", context)
317            }
318            PropertyError::ObserverNotFound { id } => {
319                write!(f, "Observer with ID {} not found", id)
320            }
321            PropertyError::PoisonedLock => {
322                write!(
323                    f,
324                    "Property is in a poisoned state due to a panic in another thread"
325                )
326            }
327            PropertyError::ObserverError { reason } => {
328                write!(f, "Observer execution failed: {}", reason)
329            }
330            PropertyError::ThreadPoolExhausted => {
331                write!(f, "Thread pool is exhausted and cannot spawn more observers")
332            }
333            PropertyError::InvalidConfiguration { reason } => {
334                write!(f, "Invalid configuration: {}", reason)
335            }
336        }
337    }
338}
339
340impl std::error::Error for PropertyError {}
341
342/// Function type for observers that get called when property values change
343pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
344
345/// Unique identifier for registered observers
346pub type ObserverId = usize;
347
348/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
349///
350/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
351/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
352/// `Drop` implementation automatically removes the associated observer from the property.
353///
354/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
355/// leaks in scenarios where observers might otherwise be forgotten.
356///
357/// # Type Requirements
358///
359/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
360/// - `Clone`: Required for observer notifications
361/// - `Send`: Required for transferring between threads  
362/// - `Sync`: Required for concurrent access from multiple threads
363/// - `'static`: Required for observer callbacks that may outlive the original scope
364///
365/// # Examples
366///
367/// ## Basic RAII Subscription
368///
369/// ```rust
370/// use observable_property::ObservableProperty;
371/// use std::sync::Arc;
372///
373/// # fn main() -> Result<(), observable_property::PropertyError> {
374/// let property = ObservableProperty::new(0);
375///
376/// {
377///     // Create subscription - observer is automatically registered
378///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
379///         println!("Value changed: {} -> {}", old, new);
380///     }))?;
381///
382///     property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
383///
384///     // When _subscription goes out of scope here, observer is automatically removed
385/// }
386///
387/// property.set(100)?; // No observer output - subscription was automatically cleaned up
388/// # Ok(())
389/// # }
390/// ```
391///
392/// ## Cross-Thread Subscription Management
393///
394/// ```rust
395/// use observable_property::ObservableProperty;
396/// use std::sync::Arc;
397/// use std::thread;
398///
399/// # fn main() -> Result<(), observable_property::PropertyError> {
400/// let property = Arc::new(ObservableProperty::new(0));
401/// let property_clone = property.clone();
402///
403/// // Create subscription in main thread
404/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
405///     println!("Observed: {} -> {}", old, new);
406/// }))?;
407///
408/// // Move subscription to another thread for cleanup
409/// let handle = thread::spawn(move || {
410///     // Subscription is still active here
411///     let _ = property_clone.set(42); // Will trigger observer
412///     
413///     // When subscription is dropped here (end of thread), observer is cleaned up
414///     drop(subscription);
415/// });
416///
417/// handle.join().unwrap();
418/// 
419/// // Observer is no longer active
420/// property.set(100)?; // No output
421/// # Ok(())
422/// # }
423/// ```
424///
425/// ## Conditional Scoped Subscriptions
426///
427/// ```rust
428/// use observable_property::ObservableProperty;
429/// use std::sync::Arc;
430///
431/// # fn main() -> Result<(), observable_property::PropertyError> {
432/// let counter = ObservableProperty::new(0);
433/// let debug_mode = true;
434///
435/// if debug_mode {
436///     let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
437///         println!("Debug: counter {} -> {}", old, new);
438///     }))?;
439///     
440///     counter.set(1)?; // Prints debug info
441///     counter.set(2)?; // Prints debug info
442///     
443///     // Debug subscription automatically cleaned up when exiting if block
444/// }
445///
446/// counter.set(3)?; // No debug output (subscription was cleaned up)
447/// # Ok(())
448/// # }
449/// ```
450///
451/// # Thread Safety
452///
453/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
454/// sent between threads and the automatic cleanup will work correctly even if the
455/// subscription is dropped from a different thread than where it was created.
456pub struct Subscription<T: Clone + Send + Sync + 'static> {
457    inner: Arc<RwLock<InnerProperty<T>>>,
458    id: ObserverId,
459}
460
461impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
462    /// Debug implementation that shows the subscription ID without exposing internals
463    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464        f.debug_struct("Subscription")
465            .field("id", &self.id)
466            .field("inner", &"[ObservableProperty]")
467            .finish()
468    }
469}
470
471impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
472    /// Automatically removes the associated observer when the subscription is dropped
473    ///
474    /// This implementation provides automatic cleanup by removing the observer
475    /// from the property's observer list when the `Subscription` goes out of scope.
476    ///
477    /// # Error Handling
478    ///
479    /// If the property's lock is poisoned or inaccessible during cleanup, the error
480    /// is silently ignored using the `let _ = ...` pattern. This is intentional
481    /// because:
482    /// 1. Drop implementations should not panic
483    /// 2. If the property is poisoned, it's likely unusable anyway
484    /// 3. There's no meaningful way to handle cleanup errors in a destructor
485    ///
486    /// # Thread Safety
487    ///
488    /// This method is safe to call from any thread, even if the subscription
489    /// was created on a different thread.
490    fn drop(&mut self) {
491        let _ = self.inner.write().map(|mut prop| {
492            prop.observers.remove(&self.id);
493        });
494    }
495}
496
497/// A thread-safe observable property that notifies observers when its value changes
498///
499/// This type wraps a value of type `T` and allows multiple observers to be notified
500/// whenever the value is modified. All operations are thread-safe and can be called
501/// from multiple threads concurrently.
502///
503/// # Type Requirements
504///
505/// The generic type `T` must implement:
506/// - `Clone`: Required for returning values and passing them to observers
507/// - `Send`: Required for transferring between threads
508/// - `Sync`: Required for concurrent access from multiple threads  
509/// - `'static`: Required for observer callbacks that may outlive the original scope
510///
511/// # Examples
512///
513/// ```rust
514/// use observable_property::ObservableProperty;
515/// use std::sync::Arc;
516///
517/// let property = ObservableProperty::new("initial".to_string());
518///
519/// let observer_id = property.subscribe(Arc::new(|old, new| {
520///     println!("Changed from '{}' to '{}'", old, new);
521/// })).map_err(|e| {
522///     eprintln!("Failed to subscribe: {}", e);
523///     e
524/// })?;
525///
526/// property.set("updated".to_string()).map_err(|e| {
527///     eprintln!("Failed to set value: {}", e);
528///     e
529/// })?; // Prints: Changed from 'initial' to 'updated'
530///
531/// property.unsubscribe(observer_id).map_err(|e| {
532///     eprintln!("Failed to unsubscribe: {}", e);
533///     e
534/// })?;
535/// # Ok::<(), observable_property::PropertyError>(())
536/// ```
537pub struct ObservableProperty<T> {
538    inner: Arc<RwLock<InnerProperty<T>>>,
539    max_threads: usize,
540}
541
542struct InnerProperty<T> {
543    value: T,
544    observers: HashMap<ObserverId, Observer<T>>,
545    next_id: ObserverId,
546}
547
548impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
549    /// Creates a new observable property with the given initial value
550    ///
551    /// # Arguments
552    ///
553    /// * `initial_value` - The starting value for this property
554    ///
555    /// # Examples
556    ///
557    /// ```rust
558    /// use observable_property::ObservableProperty;
559    ///
560    /// let property = ObservableProperty::new(42);
561    /// match property.get() {
562    ///     Ok(value) => assert_eq!(value, 42),
563    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
564    /// }
565    /// ```
566    pub fn new(initial_value: T) -> Self {
567        Self {
568            inner: Arc::new(RwLock::new(InnerProperty {
569                value: initial_value,
570                observers: HashMap::new(),
571                next_id: 0,
572            })),
573            max_threads: MAX_THREADS,
574        }
575    }
576
577    /// Creates a new observable property with a custom maximum thread count for async notifications
578    ///
579    /// This constructor allows you to customize the maximum number of threads used for
580    /// asynchronous observer notifications via `set_async()`. This is useful for tuning
581    /// performance based on your specific use case and system constraints.
582    ///
583    /// # Arguments
584    ///
585    /// * `initial_value` - The starting value for this property
586    /// * `max_threads` - Maximum number of threads to use for async notifications.
587    ///                   If 0 is provided, defaults to 4.
588    ///
589    /// # Thread Pool Behavior
590    ///
591    /// When `set_async()` is called, observers are divided into batches and each batch
592    /// runs in its own thread, up to the specified maximum. For example:
593    /// - With 100 observers and `max_threads = 4`: 4 threads with ~25 observers each
594    /// - With 10 observers and `max_threads = 8`: 10 threads with 1 observer each
595    /// - With 2 observers and `max_threads = 4`: 2 threads with 1 observer each
596    ///
597    /// # Use Cases
598    ///
599    /// ## High-Throughput Systems
600    /// ```rust
601    /// use observable_property::ObservableProperty;
602    ///
603    /// // For systems with many CPU cores and CPU-bound observers
604    /// let property = ObservableProperty::with_max_threads(0, 8);
605    /// ```
606    ///
607    /// ## Resource-Constrained Systems
608    /// ```rust
609    /// use observable_property::ObservableProperty;
610    ///
611    /// // For embedded systems or memory-constrained environments
612    /// let property = ObservableProperty::with_max_threads(42, 1);
613    /// ```
614    ///
615    /// ## I/O-Heavy Observers
616    /// ```rust
617    /// use observable_property::ObservableProperty;
618    ///
619    /// // For observers that do network/database operations
620    /// let property = ObservableProperty::with_max_threads("data".to_string(), 16);
621    /// ```
622    ///
623    /// # Performance Considerations
624    ///
625    /// - **Higher values**: Better parallelism but more thread overhead and memory usage
626    /// - **Lower values**: Less overhead but potentially slower async notifications
627    /// - **Optimal range**: Typically between 1 and 2x the number of CPU cores
628    /// - **Zero value**: Automatically uses the default value (4)
629    ///
630    /// # Thread Safety
631    ///
632    /// This setting only affects async notifications (`set_async()`). Synchronous
633    /// operations (`set()`) always execute observers sequentially regardless of this setting.
634    ///
635    /// # Examples
636    ///
637    /// ```rust
638    /// use observable_property::ObservableProperty;
639    /// use std::sync::Arc;
640    ///
641    /// // Create property with custom thread pool size
642    /// let property = ObservableProperty::with_max_threads(42, 2);
643    ///
644    /// // Subscribe observers as usual
645    /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
646    ///     println!("Value changed: {} -> {}", old, new);
647    /// })).expect("Failed to create subscription");
648    ///
649    /// // Async notifications will use at most 2 threads
650    /// property.set_async(100).expect("Failed to set value asynchronously");
651    /// ```
652    pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
653        let max_threads = if max_threads == 0 {
654            MAX_THREADS
655        } else {
656            max_threads
657        };
658        Self {
659            inner: Arc::new(RwLock::new(InnerProperty {
660                value: initial_value,
661                observers: HashMap::new(),
662                next_id: 0,
663            })),
664            max_threads,
665        }
666    }
667
668    /// Gets the current value of the property
669    ///
670    /// This method acquires a read lock, which allows multiple concurrent readers
671    /// but will block if a writer currently holds the lock.
672    ///
673    /// # Returns
674    ///
675    /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
676    /// if the lock is poisoned.
677    ///
678    /// # Examples
679    ///
680    /// ```rust
681    /// use observable_property::ObservableProperty;
682    ///
683    /// let property = ObservableProperty::new("hello".to_string());
684    /// match property.get() {
685    ///     Ok(value) => assert_eq!(value, "hello"),
686    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
687    /// }
688    /// ```
689    pub fn get(&self) -> Result<T, PropertyError> {
690        self.inner
691            .read()
692            .map(|prop| prop.value.clone())
693            .map_err(|_| PropertyError::PoisonedLock)
694    }
695
696    /// Sets the property to a new value and notifies all observers
697    ///
698    /// This method will:
699    /// 1. Acquire a write lock (blocking other readers/writers)
700    /// 2. Update the value and capture a snapshot of observers
701    /// 3. Release the lock
702    /// 4. Notify all observers sequentially with the old and new values
703    ///
704    /// Observer notifications are wrapped in panic recovery to prevent one
705    /// misbehaving observer from affecting others.
706    ///
707    /// # Arguments
708    ///
709    /// * `new_value` - The new value to set
710    ///
711    /// # Returns
712    ///
713    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
714    ///
715    /// # Examples
716    ///
717    /// ```rust
718    /// use observable_property::ObservableProperty;
719    /// use std::sync::Arc;
720    ///
721    /// let property = ObservableProperty::new(10);
722    ///
723    /// property.subscribe(Arc::new(|old, new| {
724    ///     println!("Value changed from {} to {}", old, new);
725    /// })).map_err(|e| {
726    ///     eprintln!("Failed to subscribe: {}", e);
727    ///     e
728    /// })?;
729    ///
730    /// property.set(20).map_err(|e| {
731    ///     eprintln!("Failed to set property value: {}", e);
732    ///     e
733    /// })?; // Triggers observer notification
734    /// # Ok::<(), observable_property::PropertyError>(())
735    /// ```
736    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
737        let (old_value, observers_snapshot) = {
738            let mut prop = self
739                .inner
740                .write()
741                .map_err(|_| PropertyError::WriteLockError {
742                    context: "setting property value".to_string(),
743                })?;
744
745            let old_value = prop.value.clone();
746            prop.value = new_value.clone();
747            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
748            (old_value, observers_snapshot)
749        };
750
751        for observer in observers_snapshot {
752            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
753                observer(&old_value, &new_value);
754            })) {
755                eprintln!("Observer panic: {:?}", e);
756            }
757        }
758
759        Ok(())
760    }
761
762    /// Sets the property to a new value and notifies observers asynchronously
763    ///
764    /// This method is similar to `set()` but spawns observers in background threads
765    /// for non-blocking operation. This is useful when observers might perform
766    /// time-consuming operations.
767    ///
768    /// Observers are batched into groups and each batch runs in its own thread
769    /// to limit resource usage while still providing parallelism.
770    ///
771    /// # Arguments
772    ///
773    /// * `new_value` - The new value to set
774    ///
775    /// # Returns
776    ///
777    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
778    /// Note that this only indicates the property was updated successfully;
779    /// observer execution happens asynchronously.
780    ///
781    /// # Examples
782    ///
783    /// ```rust
784    /// use observable_property::ObservableProperty;
785    /// use std::sync::Arc;
786    /// use std::time::Duration;
787    ///
788    /// let property = ObservableProperty::new(0);
789    ///
790    /// property.subscribe(Arc::new(|old, new| {
791    ///     // This observer does slow work but won't block the caller
792    ///     std::thread::sleep(Duration::from_millis(100));
793    ///     println!("Slow observer: {} -> {}", old, new);
794    /// })).map_err(|e| {
795    ///     eprintln!("Failed to subscribe: {}", e);
796    ///     e
797    /// })?;
798    ///
799    /// // This returns immediately even though observer is slow
800    /// property.set_async(42).map_err(|e| {
801    ///     eprintln!("Failed to set value asynchronously: {}", e);
802    ///     e
803    /// })?;
804    /// # Ok::<(), observable_property::PropertyError>(())
805    /// ```
806    pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
807        let (old_value, observers_snapshot) = {
808            let mut prop = self
809                .inner
810                .write()
811                .map_err(|_| PropertyError::WriteLockError {
812                    context: "setting property value".to_string(),
813                })?;
814
815            let old_value = prop.value.clone();
816            prop.value = new_value.clone();
817            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
818            (old_value, observers_snapshot)
819        };
820
821        if observers_snapshot.is_empty() {
822            return Ok(());
823        }
824
825        let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
826
827        for batch in observers_snapshot.chunks(observers_per_thread) {
828            let batch_observers = batch.to_vec();
829            let old_val = old_value.clone();
830            let new_val = new_value.clone();
831
832            thread::spawn(move || {
833                for observer in batch_observers {
834                    if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
835                        observer(&old_val, &new_val);
836                    })) {
837                        eprintln!("Observer panic in batch thread: {:?}", e);
838                    }
839                }
840            });
841        }
842
843        Ok(())
844    }
845
846    /// Subscribes an observer function to be called when the property changes
847    ///
848    /// The observer function will be called with the old and new values whenever
849    /// the property is modified via `set()` or `set_async()`.
850    ///
851    /// # Arguments
852    ///
853    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
854    ///
855    /// # Returns
856    ///
857    /// `Ok(ObserverId)` containing a unique identifier for this observer,
858    /// or `Err(PropertyError)` if the lock is poisoned.
859    ///
860    /// # Examples
861    ///
862    /// ```rust
863    /// use observable_property::ObservableProperty;
864    /// use std::sync::Arc;
865    ///
866    /// let property = ObservableProperty::new(0);
867    ///
868    /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
869    ///     println!("Property changed from {} to {}", old_value, new_value);
870    /// })).map_err(|e| {
871    ///     eprintln!("Failed to subscribe observer: {}", e);
872    ///     e
873    /// })?;
874    ///
875    /// // Later, unsubscribe using the returned ID
876    /// property.unsubscribe(observer_id).map_err(|e| {
877    ///     eprintln!("Failed to unsubscribe observer: {}", e);
878    ///     e
879    /// })?;
880    /// # Ok::<(), observable_property::PropertyError>(())
881    /// ```
882    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
883        let mut prop = self
884            .inner
885            .write()
886            .map_err(|_| PropertyError::WriteLockError {
887                context: "subscribing observer".to_string(),
888            })?;
889
890        let id = prop.next_id;
891        prop.next_id += 1;
892        prop.observers.insert(id, observer);
893        Ok(id)
894    }
895
896    /// Removes an observer identified by its ID
897    ///
898    /// # Arguments
899    ///
900    /// * `id` - The observer ID returned by `subscribe()`
901    ///
902    /// # Returns
903    ///
904    /// `Ok(bool)` where `true` means the observer was found and removed,
905    /// `false` means no observer with that ID existed.
906    /// Returns `Err(PropertyError)` if the lock is poisoned.
907    ///
908    /// # Examples
909    ///
910    /// ```rust
911    /// use observable_property::ObservableProperty;
912    /// use std::sync::Arc;
913    ///
914    /// let property = ObservableProperty::new(0);
915    /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
916    ///     eprintln!("Failed to subscribe: {}", e);
917    ///     e
918    /// })?;
919    ///
920    /// let was_removed = property.unsubscribe(id).map_err(|e| {
921    ///     eprintln!("Failed to unsubscribe: {}", e);
922    ///     e
923    /// })?;
924    /// assert!(was_removed); // Observer existed and was removed
925    ///
926    /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
927    ///     eprintln!("Failed to unsubscribe again: {}", e);
928    ///     e
929    /// })?;
930    /// assert!(!was_removed_again); // Observer no longer exists
931    /// # Ok::<(), observable_property::PropertyError>(())
932    /// ```
933    pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
934        let mut prop = self
935            .inner
936            .write()
937            .map_err(|_| PropertyError::WriteLockError {
938                context: "unsubscribing observer".to_string(),
939            })?;
940
941        let was_present = prop.observers.remove(&id).is_some();
942        Ok(was_present)
943    }
944
945    /// Subscribes an observer that only gets called when a filter condition is met
946    ///
947    /// This is useful for observing only specific types of changes, such as
948    /// when a value increases or crosses a threshold.
949    ///
950    /// # Arguments
951    ///
952    /// * `observer` - The observer function to call when the filter passes
953    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
954    ///
955    /// # Returns
956    ///
957    /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
958    ///
959    /// # Examples
960    ///
961    /// ```rust
962    /// use observable_property::ObservableProperty;
963    /// use std::sync::Arc;
964    ///
965    /// let property = ObservableProperty::new(0);
966    ///
967    /// // Only notify when value increases
968    /// let id = property.subscribe_filtered(
969    ///     Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
970    ///     |old, new| new > old
971    /// ).map_err(|e| {
972    ///     eprintln!("Failed to subscribe filtered observer: {}", e);
973    ///     e
974    /// })?;
975    ///
976    /// property.set(10).map_err(|e| {
977    ///     eprintln!("Failed to set value: {}", e);
978    ///     e
979    /// })?; // Triggers observer (0 -> 10)
980    /// property.set(5).map_err(|e| {
981    ///     eprintln!("Failed to set value: {}", e);
982    ///     e
983    /// })?;  // Does NOT trigger observer (10 -> 5)
984    /// property.set(15).map_err(|e| {
985    ///     eprintln!("Failed to set value: {}", e);
986    ///     e
987    /// })?; // Triggers observer (5 -> 15)
988    /// # Ok::<(), observable_property::PropertyError>(())
989    /// ```
990    pub fn subscribe_filtered<F>(
991        &self,
992        observer: Observer<T>,
993        filter: F,
994    ) -> Result<ObserverId, PropertyError>
995    where
996        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
997    {
998        let filter = Arc::new(filter);
999        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1000            if filter(old_val, new_val) {
1001                observer(old_val, new_val);
1002            }
1003        });
1004
1005        self.subscribe(filtered_observer)
1006    }
1007
1008    pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
1009        let prop = self
1010            .inner
1011            .read()
1012            .map_err(|_| PropertyError::ReadLockError {
1013                context: "notifying observers".to_string(),
1014            })?;
1015
1016        for (old_val, new_val) in changes {
1017            for observer in prop.observers.values() {
1018                observer(&old_val, &new_val);
1019            }
1020        }
1021        Ok(())
1022    }
1023
1024    /// Subscribes an observer and returns a RAII guard for automatic cleanup
1025    ///
1026    /// This method is similar to `subscribe()` but returns a `Subscription` object
1027    /// that automatically removes the observer when it goes out of scope. This
1028    /// provides a more convenient and safer alternative to manual subscription
1029    /// management.
1030    ///
1031    /// # Arguments
1032    ///
1033    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1034    ///
1035    /// # Returns
1036    ///
1037    /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
1038    /// or `Err(PropertyError)` if the lock is poisoned.
1039    ///
1040    /// # Examples
1041    ///
1042    /// ## Basic RAII Subscription
1043    ///
1044    /// ```rust
1045    /// use observable_property::ObservableProperty;
1046    /// use std::sync::Arc;
1047    ///
1048    /// # fn main() -> Result<(), observable_property::PropertyError> {
1049    /// let property = ObservableProperty::new(0);
1050    ///
1051    /// {
1052    ///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1053    ///         println!("Value: {} -> {}", old, new);
1054    ///     }))?;
1055    ///
1056    ///     property.set(42)?; // Prints: "Value: 0 -> 42"
1057    ///     property.set(100)?; // Prints: "Value: 42 -> 100"
1058    ///
1059    ///     // Automatic cleanup when _subscription goes out of scope
1060    /// }
1061    ///
1062    /// property.set(200)?; // No output - subscription was cleaned up
1063    /// # Ok(())
1064    /// # }
1065    /// ```
1066    ///
1067    /// ## Comparison with Manual Management
1068    ///
1069    /// ```rust
1070    /// use observable_property::ObservableProperty;
1071    /// use std::sync::Arc;
1072    ///
1073    /// # fn main() -> Result<(), observable_property::PropertyError> {
1074    /// let property = ObservableProperty::new("initial".to_string());
1075    ///
1076    /// // Method 1: Manual subscription management (traditional approach)
1077    /// let observer_id = property.subscribe(Arc::new(|old, new| {
1078    ///     println!("Manual: {} -> {}", old, new);
1079    /// }))?;
1080    ///
1081    /// // Method 2: RAII subscription management (recommended)
1082    /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1083    ///     println!("RAII: {} -> {}", old, new);
1084    /// }))?;
1085    ///
1086    /// // Both observers will be called
1087    /// property.set("changed".to_string())?;
1088    /// // Prints:
1089    /// // "Manual: initial -> changed"
1090    /// // "RAII: initial -> changed"
1091    ///
1092    /// // Manual cleanup required for first observer
1093    /// property.unsubscribe(observer_id)?;
1094    ///
1095    /// // Second observer (_subscription) is automatically cleaned up when
1096    /// // the variable goes out of scope - no manual intervention needed
1097    /// # Ok(())
1098    /// # }
1099    /// ```
1100    ///
1101    /// ## Error Handling with Early Returns
1102    ///
1103    /// ```rust
1104    /// use observable_property::ObservableProperty;
1105    /// use std::sync::Arc;
1106    ///
1107    /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1108    ///     let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1109    ///         println!("Processing: {} -> {}", old, new);
1110    ///     }))?;
1111    ///
1112    ///     property.set(1)?;
1113    ///     
1114    ///     if property.get()? > 0 {
1115    ///         return Ok(()); // Subscription automatically cleaned up on early return
1116    ///     }
1117    ///
1118    ///     property.set(2)?;
1119    ///     Ok(()) // Subscription automatically cleaned up on normal return
1120    /// }
1121    ///
1122    /// # fn main() -> Result<(), observable_property::PropertyError> {
1123    /// let property = ObservableProperty::new(0);
1124    /// process_with_monitoring(&property)?; // Monitoring active only during function call
1125    /// property.set(99)?; // No monitoring output - subscription was cleaned up
1126    /// # Ok(())
1127    /// # }
1128    /// ```
1129    ///
1130    /// ## Multi-threaded Subscription Management
1131    ///
1132    /// ```rust
1133    /// use observable_property::ObservableProperty;
1134    /// use std::sync::Arc;
1135    /// use std::thread;
1136    ///
1137    /// # fn main() -> Result<(), observable_property::PropertyError> {
1138    /// let property = Arc::new(ObservableProperty::new(0));
1139    /// let property_clone = property.clone();
1140    ///
1141    /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1142    ///     let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1143    ///         println!("Thread observer: {} -> {}", old, new);
1144    ///     }))?;
1145    ///
1146    ///     property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1147    ///     
1148    ///     // Subscription automatically cleaned up when thread ends
1149    ///     Ok(())
1150    /// });
1151    ///
1152    /// handle.join().unwrap()?;
1153    /// property.set(100)?; // No output - thread subscription was cleaned up
1154    /// # Ok(())
1155    /// # }
1156    /// ```
1157    ///
1158    /// # Use Cases
1159    ///
1160    /// This method is particularly useful in scenarios such as:
1161    /// - Temporary observers that should be active only during a specific scope
1162    /// - Error-prone code where manual cleanup might be forgotten
1163    /// - Complex control flow where multiple exit points make manual cleanup difficult
1164    /// - Resource-constrained environments where observer leaks are problematic
1165    pub fn subscribe_with_subscription(
1166        &self,
1167        observer: Observer<T>,
1168    ) -> Result<Subscription<T>, PropertyError> {
1169        let id = self.subscribe(observer)?;
1170        Ok(Subscription {
1171            inner: Arc::clone(&self.inner),
1172            id,
1173        })
1174    }
1175
1176    /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1177    ///
1178    /// This method combines the functionality of `subscribe_filtered()` with the automatic
1179    /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1180    /// called when the filter condition is satisfied, and it will be automatically
1181    /// unsubscribed when the returned `Subscription` goes out of scope.
1182    ///
1183    /// # Arguments
1184    ///
1185    /// * `observer` - The observer function to call when the filter passes
1186    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1187    ///
1188    /// # Returns
1189    ///
1190    /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1191    /// or `Err(PropertyError)` if the lock is poisoned.
1192    ///
1193    /// # Examples
1194    ///
1195    /// ## Basic Filtered RAII Subscription
1196    ///
1197    /// ```rust
1198    /// use observable_property::ObservableProperty;
1199    /// use std::sync::Arc;
1200    ///
1201    /// # fn main() -> Result<(), observable_property::PropertyError> {
1202    /// let counter = ObservableProperty::new(0);
1203    ///
1204    /// {
1205    ///     // Monitor only increases with automatic cleanup
1206    ///     let _increase_monitor = counter.subscribe_filtered_with_subscription(
1207    ///         Arc::new(|old, new| {
1208    ///             println!("Counter increased: {} -> {}", old, new);
1209    ///         }),
1210    ///         |old, new| new > old
1211    ///     )?;
1212    ///
1213    ///     counter.set(5)?;  // Prints: "Counter increased: 0 -> 5"
1214    ///     counter.set(3)?;  // No output (decrease)
1215    ///     counter.set(7)?;  // Prints: "Counter increased: 3 -> 7"
1216    ///
1217    ///     // Subscription automatically cleaned up when leaving scope
1218    /// }
1219    ///
1220    /// counter.set(10)?; // No output - subscription was cleaned up
1221    /// # Ok(())
1222    /// # }
1223    /// ```
1224    ///
1225    /// ## Multi-Condition Temperature Monitoring
1226    ///
1227    /// ```rust
1228    /// use observable_property::ObservableProperty;
1229    /// use std::sync::Arc;
1230    ///
1231    /// # fn main() -> Result<(), observable_property::PropertyError> {
1232    /// let temperature = ObservableProperty::new(20.0_f64);
1233    ///
1234    /// {
1235    ///     // Create filtered subscription that only triggers for significant temperature increases
1236    ///     let _heat_warning = temperature.subscribe_filtered_with_subscription(
1237    ///         Arc::new(|old_temp, new_temp| {
1238    ///             println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1239    ///                      old_temp, new_temp);
1240    ///         }),
1241    ///         |old, new| new > old && (new - old) > 5.0  // Only trigger for increases > 5°C
1242    ///     )?;
1243    ///
1244    ///     // Create another filtered subscription for cooling alerts
1245    ///     let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1246    ///         Arc::new(|old_temp, new_temp| {
1247    ///             println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1248    ///                      old_temp, new_temp);
1249    ///         }),
1250    ///         |old, new| new < old && (old - new) > 3.0  // Only trigger for decreases > 3°C
1251    ///     )?;
1252    ///
1253    ///     // Test the filters
1254    ///     temperature.set(22.0)?; // No alerts (increase of only 2°C)
1255    ///     temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1256    ///     temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1257    ///
1258    ///     // Both subscriptions are automatically cleaned up when they go out of scope
1259    /// }
1260    ///
1261    /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1262    /// # Ok(())
1263    /// # }
1264    /// ```
1265    ///
1266    /// ## Conditional Monitoring with Complex Filters
1267    ///
1268    /// ```rust
1269    /// use observable_property::ObservableProperty;
1270    /// use std::sync::Arc;
1271    ///
1272    /// # fn main() -> Result<(), observable_property::PropertyError> {
1273    /// let stock_price = ObservableProperty::new(100.0_f64);
1274    ///
1275    /// {
1276    ///     // Monitor significant price movements (> 5% change)
1277    ///     let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1278    ///         Arc::new(|old_price, new_price| {
1279    ///             let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1280    ///             println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1281    ///                     old_price, new_price, change_percent);
1282    ///         }),
1283    ///         |old, new| {
1284    ///             let change_percent = ((new - old) / old * 100.0).abs();
1285    ///             change_percent > 5.0  // Trigger on > 5% change
1286    ///         }
1287    ///     )?;
1288    ///
1289    ///     stock_price.set(103.0)?; // No alert (3% change)
1290    ///     stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1291    ///     stock_price.set(95.0)?;  // Alert triggered (12% decrease)
1292    ///
1293    ///     // Subscription automatically cleaned up when leaving scope
1294    /// }
1295    ///
1296    /// stock_price.set(200.0)?; // No alert - monitoring ended
1297    /// # Ok(())
1298    /// # }
1299    /// ```
1300    ///
1301    /// ## Cross-Thread Filtered Monitoring
1302    ///
1303    /// ```rust
1304    /// use observable_property::ObservableProperty;
1305    /// use std::sync::Arc;
1306    /// use std::thread;
1307    /// use std::time::Duration;
1308    ///
1309    /// # fn main() -> Result<(), observable_property::PropertyError> {
1310    /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1311    /// let latency_clone = network_latency.clone();
1312    ///
1313    /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1314    ///     // Monitor high latency in background thread with automatic cleanup
1315    ///     let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1316    ///         Arc::new(|old_ms, new_ms| {
1317    ///             println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1318    ///         }),
1319    ///         |_, new| *new > 100  // Alert when latency exceeds 100ms
1320    ///     )?;
1321    ///
1322    ///     // Simulate monitoring for a short time
1323    ///     thread::sleep(Duration::from_millis(10));
1324    ///     
1325    ///     // Subscription automatically cleaned up when thread ends
1326    ///     Ok(())
1327    /// });
1328    ///
1329    /// // Simulate network conditions
1330    /// network_latency.set(80)?;  // No alert (under threshold)
1331    /// network_latency.set(150)?; // Alert triggered in background thread
1332    ///
1333    /// monitor_handle.join().unwrap()?;
1334    /// network_latency.set(200)?; // No alert - background monitoring ended
1335    /// # Ok(())
1336    /// # }
1337    /// ```
1338    ///
1339    /// # Use Cases
1340    ///
1341    /// This method is ideal for:
1342    /// - Threshold-based monitoring with automatic cleanup
1343    /// - Temporary conditional observers in specific code blocks
1344    /// - Event-driven systems where observers should be active only during certain phases
1345    /// - Resource management scenarios where filtered observers have limited lifetimes
1346    ///
1347    /// # Performance Notes
1348    ///
1349    /// The filter function is evaluated for every property change, so it should be
1350    /// lightweight. Complex filtering logic should be optimized to avoid performance
1351    /// bottlenecks, especially in high-frequency update scenarios.
1352    pub fn subscribe_filtered_with_subscription<F>(
1353        &self,
1354        observer: Observer<T>,
1355        filter: F,
1356    ) -> Result<Subscription<T>, PropertyError>
1357    where
1358        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1359    {
1360        let id = self.subscribe_filtered(observer, filter)?;
1361        Ok(Subscription {
1362            inner: Arc::clone(&self.inner),
1363            id,
1364        })
1365    }
1366}
1367
1368impl<T: Clone> Clone for ObservableProperty<T> {
1369    /// Creates a new reference to the same observable property
1370    ///
1371    /// This creates a new `ObservableProperty` instance that shares the same
1372    /// underlying data with the original. Changes made through either instance
1373    /// will be visible to observers subscribed through both instances.
1374    ///
1375    /// # Examples
1376    ///
1377    /// ```rust
1378    /// use observable_property::ObservableProperty;
1379    /// use std::sync::Arc;
1380    ///
1381    /// let property1 = ObservableProperty::new(42);
1382    /// let property2 = property1.clone();
1383    ///
1384    /// property2.subscribe(Arc::new(|old, new| {
1385    ///     println!("Observer on property2 saw change: {} -> {}", old, new);
1386    /// })).map_err(|e| {
1387    ///     eprintln!("Failed to subscribe: {}", e);
1388    ///     e
1389    /// })?;
1390    ///
1391    /// // This change through property1 will trigger the observer on property2
1392    /// property1.set(100).map_err(|e| {
1393    ///     eprintln!("Failed to set value: {}", e);
1394    ///     e
1395    /// })?;
1396    /// # Ok::<(), observable_property::PropertyError>(())
1397    /// ```
1398    fn clone(&self) -> Self {
1399        Self {
1400            inner: Arc::clone(&self.inner),
1401            max_threads: self.max_threads,
1402        }
1403    }
1404}
1405
1406impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1407    /// Debug implementation that shows the current value if accessible
1408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1409        match self.get() {
1410            Ok(value) => f
1411                .debug_struct("ObservableProperty")
1412                .field("value", &value)
1413                .field("observers_count", &"[hidden]")
1414                .field("max_threads", &self.max_threads)
1415                .finish(),
1416            Err(_) => f
1417                .debug_struct("ObservableProperty")
1418                .field("value", &"[inaccessible]")
1419                .field("observers_count", &"[hidden]")
1420                .finish(),
1421        }
1422    }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427    use super::*;
1428    use std::sync::atomic::{AtomicUsize, Ordering};
1429    use std::time::Duration;
1430
1431    #[test]
1432    fn test_property_creation_and_basic_operations() {
1433        let prop = ObservableProperty::new(42);
1434
1435        // Test initial value
1436        match prop.get() {
1437            Ok(value) => assert_eq!(value, 42),
1438            Err(e) => panic!("Failed to get initial value: {}", e),
1439        }
1440
1441        // Test setting value
1442        if let Err(e) = prop.set(100) {
1443            panic!("Failed to set value: {}", e);
1444        }
1445
1446        match prop.get() {
1447            Ok(value) => assert_eq!(value, 100),
1448            Err(e) => panic!("Failed to get updated value: {}", e),
1449        }
1450    }
1451
1452    #[test]
1453    fn test_observer_subscription_and_notification() {
1454        let prop = ObservableProperty::new("initial".to_string());
1455        let notification_count = Arc::new(AtomicUsize::new(0));
1456        let last_old_value = Arc::new(RwLock::new(String::new()));
1457        let last_new_value = Arc::new(RwLock::new(String::new()));
1458
1459        let count_clone = notification_count.clone();
1460        let old_clone = last_old_value.clone();
1461        let new_clone = last_new_value.clone();
1462
1463        let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1464            count_clone.fetch_add(1, Ordering::SeqCst);
1465            if let Ok(mut old_val) = old_clone.write() {
1466                *old_val = old.clone();
1467            }
1468            if let Ok(mut new_val) = new_clone.write() {
1469                *new_val = new.clone();
1470            }
1471        })) {
1472            Ok(id) => id,
1473            Err(e) => panic!("Failed to subscribe observer: {}", e),
1474        };
1475
1476        // Change value and verify notification
1477        if let Err(e) = prop.set("changed".to_string()) {
1478            panic!("Failed to set property value: {}", e);
1479        }
1480
1481        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1482
1483        match last_old_value.read() {
1484            Ok(old_val) => assert_eq!(*old_val, "initial"),
1485            Err(e) => panic!("Failed to read old value: {:?}", e),
1486        }
1487
1488        match last_new_value.read() {
1489            Ok(new_val) => assert_eq!(*new_val, "changed"),
1490            Err(e) => panic!("Failed to read new value: {:?}", e),
1491        }
1492
1493        // Test unsubscription
1494        match prop.unsubscribe(observer_id) {
1495            Ok(was_present) => assert!(was_present),
1496            Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1497        }
1498
1499        // Change value again - should not notify
1500        if let Err(e) = prop.set("not_notified".to_string()) {
1501            panic!("Failed to set property value after unsubscribe: {}", e);
1502        }
1503        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1504    }
1505
1506    #[test]
1507    fn test_filtered_observer() {
1508        let prop = ObservableProperty::new(0i32);
1509        let notification_count = Arc::new(AtomicUsize::new(0));
1510        let count_clone = notification_count.clone();
1511
1512        // Observer only triggered when value increases
1513        let observer_id = match prop.subscribe_filtered(
1514            Arc::new(move |_, _| {
1515                count_clone.fetch_add(1, Ordering::SeqCst);
1516            }),
1517            |old, new| new > old,
1518        ) {
1519            Ok(id) => id,
1520            Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1521        };
1522
1523        // Should trigger (0 -> 5)
1524        if let Err(e) = prop.set(5) {
1525            panic!("Failed to set property value to 5: {}", e);
1526        }
1527        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1528
1529        // Should NOT trigger (5 -> 3)
1530        if let Err(e) = prop.set(3) {
1531            panic!("Failed to set property value to 3: {}", e);
1532        }
1533        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1534
1535        // Should trigger (3 -> 10)
1536        if let Err(e) = prop.set(10) {
1537            panic!("Failed to set property value to 10: {}", e);
1538        }
1539        assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1540
1541        match prop.unsubscribe(observer_id) {
1542            Ok(_) => {}
1543            Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
1544        }
1545    }
1546
1547    #[test]
1548    fn test_thread_safety_concurrent_reads() {
1549        let prop = Arc::new(ObservableProperty::new(42i32));
1550        let num_threads = 10;
1551        let reads_per_thread = 100;
1552
1553        let handles: Vec<_> = (0..num_threads)
1554            .map(|_| {
1555                let prop_clone = prop.clone();
1556                thread::spawn(move || {
1557                    for _ in 0..reads_per_thread {
1558                        match prop_clone.get() {
1559                            Ok(value) => assert_eq!(value, 42),
1560                            Err(e) => panic!("Failed to read property value: {}", e),
1561                        }
1562                        thread::sleep(Duration::from_millis(1));
1563                    }
1564                })
1565            })
1566            .collect();
1567
1568        for handle in handles {
1569            if let Err(e) = handle.join() {
1570                panic!("Thread failed to complete: {:?}", e);
1571            }
1572        }
1573    }
1574
1575    #[test]
1576    fn test_async_set_performance() {
1577        let prop = ObservableProperty::new(0i32);
1578        let slow_observer_count = Arc::new(AtomicUsize::new(0));
1579        let count_clone = slow_observer_count.clone();
1580
1581        // Add observer that simulates slow work
1582        let _id = match prop.subscribe(Arc::new(move |_, _| {
1583            thread::sleep(Duration::from_millis(50));
1584            count_clone.fetch_add(1, Ordering::SeqCst);
1585        })) {
1586            Ok(id) => id,
1587            Err(e) => panic!("Failed to subscribe slow observer: {}", e),
1588        };
1589
1590        // Test synchronous set (should be slow)
1591        let start = std::time::Instant::now();
1592        if let Err(e) = prop.set(1) {
1593            panic!("Failed to set property value synchronously: {}", e);
1594        }
1595        let sync_duration = start.elapsed();
1596
1597        // Test asynchronous set (should be fast)
1598        let start = std::time::Instant::now();
1599        if let Err(e) = prop.set_async(2) {
1600            panic!("Failed to set property value asynchronously: {}", e);
1601        }
1602        let async_duration = start.elapsed();
1603
1604        // Async should be much faster than sync
1605        assert!(async_duration < sync_duration);
1606        assert!(async_duration.as_millis() < 10); // Should be very fast
1607
1608        // Wait for async observer to complete
1609        thread::sleep(Duration::from_millis(100));
1610
1611        // Both observers should have been called
1612        assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
1613    }
1614
1615    #[test]
1616    fn test_lock_poisoning() {
1617        // Create a property that we'll poison
1618        let prop = Arc::new(ObservableProperty::new(0));
1619        let prop_clone = prop.clone();
1620
1621        // Create a thread that will deliberately poison the lock
1622        let poison_thread = thread::spawn(move || {
1623            // Get write lock and then panic, which will poison the lock
1624            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1625            panic!("Deliberate panic to poison the lock");
1626        });
1627
1628        // Wait for the thread to complete (it will panic)
1629        let _ = poison_thread.join();
1630
1631        // Now the lock should be poisoned, verify all operations return appropriate errors
1632        match prop.get() {
1633            Ok(_) => panic!("get() should fail on a poisoned lock"),
1634            Err(e) => match e {
1635                PropertyError::PoisonedLock => {} // Expected error
1636                _ => panic!("Expected PoisonedLock error, got: {:?}", e),
1637            },
1638        }
1639
1640        match prop.set(42) {
1641            Ok(_) => panic!("set() should fail on a poisoned lock"),
1642            Err(e) => match e {
1643                PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1644                _ => panic!("Expected lock-related error, got: {:?}", e),
1645            },
1646        }
1647
1648        match prop.subscribe(Arc::new(|_, _| {})) {
1649            Ok(_) => panic!("subscribe() should fail on a poisoned lock"),
1650            Err(e) => match e {
1651                PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1652                _ => panic!("Expected lock-related error, got: {:?}", e),
1653            },
1654        }
1655    }
1656
1657    #[test]
1658    fn test_observer_panic_isolation() {
1659        let prop = ObservableProperty::new(0);
1660        let call_counts = Arc::new(AtomicUsize::new(0));
1661
1662        // First observer will panic
1663        let panic_observer_id = prop
1664            .subscribe(Arc::new(|_, _| {
1665                panic!("This observer deliberately panics");
1666            }))
1667            .expect("Failed to subscribe panic observer");
1668
1669        // Second observer should still be called despite first one panicking
1670        let counts = call_counts.clone();
1671        let normal_observer_id = prop
1672            .subscribe(Arc::new(move |_, _| {
1673                counts.fetch_add(1, Ordering::SeqCst);
1674            }))
1675            .expect("Failed to subscribe normal observer");
1676
1677        // Trigger the observers - this shouldn't panic despite the first observer panicking
1678        prop.set(42).expect("Failed to set property value");
1679
1680        // Verify the second observer was still called
1681        assert_eq!(call_counts.load(Ordering::SeqCst), 1);
1682
1683        // Clean up
1684        prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
1685        prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
1686    }
1687
1688    #[test]
1689    fn test_unsubscribe_nonexistent_observer() {
1690        let property = ObservableProperty::new(0);
1691
1692        // Generate a valid observer ID
1693        let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
1694
1695        // Create an ID that doesn't exist (valid_id + 1000 should not exist)
1696        let nonexistent_id = valid_id + 1000;
1697
1698        // Test unsubscribing a nonexistent observer
1699        match property.unsubscribe(nonexistent_id) {
1700            Ok(was_present) => {
1701                assert!(
1702                    !was_present,
1703                    "Unsubscribe should return false for nonexistent ID"
1704                );
1705            }
1706            Err(e) => panic!("Unsubscribe returned error: {:?}", e),
1707        }
1708
1709        // Also verify that unsubscribing twice returns false the second time
1710        property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
1711
1712        let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
1713        assert!(!result, "Second unsubscribe should return false");
1714    }
1715
1716    #[test]
1717    fn test_observer_id_wraparound() {
1718        let prop = ObservableProperty::new(0);
1719
1720        // Test that observer IDs increment properly and don't wrap around unexpectedly
1721        let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
1722        let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
1723        let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
1724
1725        assert!(id2 > id1, "Observer IDs should increment");
1726        assert!(id3 > id2, "Observer IDs should continue incrementing");
1727        assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
1728        assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
1729
1730        // Clean up
1731        prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
1732        prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
1733        prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
1734    }
1735
1736    #[test]
1737    fn test_concurrent_subscribe_unsubscribe() {
1738        let prop = Arc::new(ObservableProperty::new(0));
1739        let num_threads = 8;
1740        let operations_per_thread = 100;
1741
1742        let handles: Vec<_> = (0..num_threads)
1743            .map(|thread_id| {
1744                let prop_clone = prop.clone();
1745                thread::spawn(move || {
1746                    let mut local_ids = Vec::new();
1747
1748                    for i in 0..operations_per_thread {
1749                        // Subscribe an observer
1750                        let observer_id = prop_clone
1751                            .subscribe(Arc::new(move |old, new| {
1752                                // Do some work to simulate real observer
1753                                let _ = thread_id + i + old + new;
1754                            }))
1755                            .expect("Subscribe should succeed");
1756
1757                        local_ids.push(observer_id);
1758
1759                        // Occasionally unsubscribe some observers
1760                        if i % 10 == 0 && !local_ids.is_empty() {
1761                            let idx = i % local_ids.len();
1762                            let id_to_remove = local_ids.remove(idx);
1763                            prop_clone
1764                                .unsubscribe(id_to_remove)
1765                                .expect("Unsubscribe should succeed");
1766                        }
1767                    }
1768
1769                    // Clean up remaining observers
1770                    for id in local_ids {
1771                        prop_clone
1772                            .unsubscribe(id)
1773                            .expect("Final cleanup should succeed");
1774                    }
1775                })
1776            })
1777            .collect();
1778
1779        // Wait for all threads to complete
1780        for handle in handles {
1781            handle.join().expect("Thread should complete successfully");
1782        }
1783
1784        // Property should still be functional
1785        prop.set(42)
1786            .expect("Property should still work after concurrent operations");
1787    }
1788
1789    #[test]
1790    fn test_multiple_observer_panics_isolation() {
1791        let prop = ObservableProperty::new(0);
1792        let successful_calls = Arc::new(AtomicUsize::new(0));
1793
1794        // Create multiple observers that will panic
1795        let _panic_id1 = prop
1796            .subscribe(Arc::new(|_, _| {
1797                panic!("First panic observer");
1798            }))
1799            .expect("Failed to subscribe first panic observer");
1800
1801        let _panic_id2 = prop
1802            .subscribe(Arc::new(|_, _| {
1803                panic!("Second panic observer");
1804            }))
1805            .expect("Failed to subscribe second panic observer");
1806
1807        // Create observers that should succeed despite the panics
1808        let count1 = successful_calls.clone();
1809        let _success_id1 = prop
1810            .subscribe(Arc::new(move |_, _| {
1811                count1.fetch_add(1, Ordering::SeqCst);
1812            }))
1813            .expect("Failed to subscribe first success observer");
1814
1815        let count2 = successful_calls.clone();
1816        let _success_id2 = prop
1817            .subscribe(Arc::new(move |_, _| {
1818                count2.fetch_add(1, Ordering::SeqCst);
1819            }))
1820            .expect("Failed to subscribe second success observer");
1821
1822        // Trigger all observers
1823        prop.set(42).expect("Failed to set property value for panic isolation test");
1824
1825        // Both successful observers should have been called despite the panics
1826        assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
1827    }
1828
1829    #[test]
1830    fn test_async_observer_panic_isolation() {
1831        let prop = ObservableProperty::new(0);
1832        let successful_calls = Arc::new(AtomicUsize::new(0));
1833
1834        // Create observer that will panic
1835        let _panic_id = prop
1836            .subscribe(Arc::new(|_, _| {
1837                panic!("Async panic observer");
1838            }))
1839            .expect("Failed to subscribe async panic observer");
1840
1841        // Create observer that should succeed
1842        let count = successful_calls.clone();
1843        let _success_id = prop
1844            .subscribe(Arc::new(move |_, _| {
1845                count.fetch_add(1, Ordering::SeqCst);
1846            }))
1847            .expect("Failed to subscribe async success observer");
1848
1849        // Use async set to trigger observers in background threads
1850        prop.set_async(42).expect("Failed to set property value asynchronously");
1851
1852        // Wait for async observers to complete
1853        thread::sleep(Duration::from_millis(100));
1854
1855        // The successful observer should have been called despite the panic
1856        assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
1857    }
1858
1859    #[test]
1860    fn test_very_large_observer_count() {
1861        let prop = ObservableProperty::new(0);
1862        let total_calls = Arc::new(AtomicUsize::new(0));
1863        let observer_count = 1000;
1864
1865        // Subscribe many observers
1866        let mut observer_ids = Vec::with_capacity(observer_count);
1867        for i in 0..observer_count {
1868            let count = total_calls.clone();
1869            let id = prop
1870                .subscribe(Arc::new(move |old, new| {
1871                    count.fetch_add(1, Ordering::SeqCst);
1872                    // Verify we got the right values
1873                    assert_eq!(*old, 0);
1874                    assert_eq!(*new, i + 1);
1875                }))
1876                .expect("Failed to subscribe large observer count test observer");
1877            observer_ids.push(id);
1878        }
1879
1880        // Trigger all observers
1881        prop.set(observer_count).expect("Failed to set property value for large observer count test");
1882
1883        // All observers should have been called
1884        assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
1885
1886        // Clean up
1887        for id in observer_ids {
1888            prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
1889        }
1890    }
1891
1892    #[test]
1893    fn test_observer_with_mutable_state() {
1894        let prop = ObservableProperty::new(0);
1895        let call_history = Arc::new(RwLock::new(Vec::new()));
1896
1897        let history = call_history.clone();
1898        let observer_id = prop
1899            .subscribe(Arc::new(move |old, new| {
1900                if let Ok(mut hist) = history.write() {
1901                    hist.push((*old, *new));
1902                }
1903            }))
1904            .expect("Failed to subscribe mutable state observer");
1905
1906        // Make several changes
1907        prop.set(1).expect("Failed to set property to 1");
1908        prop.set(2).expect("Failed to set property to 2");
1909        prop.set(3).expect("Failed to set property to 3");
1910
1911        // Verify the history was recorded correctly
1912        let history = call_history.read().expect("Failed to read call history");
1913        assert_eq!(history.len(), 3);
1914        assert_eq!(history[0], (0, 1));
1915        assert_eq!(history[1], (1, 2));
1916        assert_eq!(history[2], (2, 3));
1917
1918        prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
1919    }
1920
1921    #[test]
1922    fn test_subscription_automatic_cleanup() {
1923        let prop = ObservableProperty::new(0);
1924        let call_count = Arc::new(AtomicUsize::new(0));
1925
1926        // Test that subscription automatically cleans up when dropped
1927        {
1928            let count = call_count.clone();
1929            let _subscription = prop
1930                .subscribe_with_subscription(Arc::new(move |_, _| {
1931                    count.fetch_add(1, Ordering::SeqCst);
1932                }))
1933                .expect("Failed to create subscription for automatic cleanup test");
1934
1935            // Observer should be active while subscription is in scope
1936            prop.set(1).expect("Failed to set property value in subscription test");
1937            assert_eq!(call_count.load(Ordering::SeqCst), 1);
1938
1939            // Subscription goes out of scope here and should auto-cleanup
1940        }
1941
1942        // Observer should no longer be active after subscription dropped
1943        prop.set(2).expect("Failed to set property value after subscription dropped");
1944        assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
1945    }
1946
1947    #[test]
1948    fn test_subscription_explicit_drop() {
1949        let prop = ObservableProperty::new(0);
1950        let call_count = Arc::new(AtomicUsize::new(0));
1951
1952        let count = call_count.clone();
1953        let subscription = prop
1954            .subscribe_with_subscription(Arc::new(move |_, _| {
1955                count.fetch_add(1, Ordering::SeqCst);
1956            }))
1957            .expect("Failed to create subscription for explicit drop test");
1958
1959        // Observer should be active
1960        prop.set(1).expect("Failed to set property value before explicit drop");
1961        assert_eq!(call_count.load(Ordering::SeqCst), 1);
1962
1963        // Explicitly drop the subscription
1964        drop(subscription);
1965
1966        // Observer should no longer be active
1967        prop.set(2).expect("Failed to set property value after explicit drop");
1968        assert_eq!(call_count.load(Ordering::SeqCst), 1);
1969    }
1970
1971    #[test]
1972    fn test_multiple_subscriptions_with_cleanup() {
1973        let prop = ObservableProperty::new(0);
1974        let call_count1 = Arc::new(AtomicUsize::new(0));
1975        let call_count2 = Arc::new(AtomicUsize::new(0));
1976        let call_count3 = Arc::new(AtomicUsize::new(0));
1977
1978        let count1 = call_count1.clone();
1979        let count2 = call_count2.clone();
1980        let count3 = call_count3.clone();
1981
1982        let subscription1 = prop
1983            .subscribe_with_subscription(Arc::new(move |_, _| {
1984                count1.fetch_add(1, Ordering::SeqCst);
1985            }))
1986            .expect("Failed to create first subscription");
1987
1988        let subscription2 = prop
1989            .subscribe_with_subscription(Arc::new(move |_, _| {
1990                count2.fetch_add(1, Ordering::SeqCst);
1991            }))
1992            .expect("Failed to create second subscription");
1993
1994        let subscription3 = prop
1995            .subscribe_with_subscription(Arc::new(move |_, _| {
1996                count3.fetch_add(1, Ordering::SeqCst);
1997            }))
1998            .expect("Failed to create third subscription");
1999
2000        // All observers should be active
2001        prop.set(1).expect("Failed to set property value with all subscriptions");
2002        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
2003        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2004        assert_eq!(call_count3.load(Ordering::SeqCst), 1);
2005
2006        // Drop second subscription
2007        drop(subscription2);
2008
2009        // Only first and third should be active
2010        prop.set(2).expect("Failed to set property value with partial subscriptions");
2011        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2012        assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
2013        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2014
2015        // Drop remaining subscriptions
2016        drop(subscription1);
2017        drop(subscription3);
2018
2019        // No observers should be active
2020        prop.set(3).expect("Failed to set property value with no subscriptions");
2021        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2022        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2023        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2024    }
2025
2026    #[test]
2027    fn test_subscription_drop_with_poisoned_lock() {
2028        let prop = Arc::new(ObservableProperty::new(0));
2029        let prop_clone = prop.clone();
2030
2031        // Create a subscription
2032        let call_count = Arc::new(AtomicUsize::new(0));
2033        let count = call_count.clone();
2034        let subscription = prop
2035            .subscribe_with_subscription(Arc::new(move |_, _| {
2036                count.fetch_add(1, Ordering::SeqCst);
2037            }))
2038            .expect("Failed to create subscription for poisoned lock test");
2039
2040        // Poison the lock by panicking while holding a write lock
2041        let poison_thread = thread::spawn(move || {
2042            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2043            panic!("Deliberate panic to poison the lock");
2044        });
2045        let _ = poison_thread.join(); // Ignore the panic result
2046
2047        // Dropping the subscription should not panic even with poisoned lock
2048        // This tests that the Drop implementation handles poisoned locks gracefully
2049        drop(subscription); // Should complete without panic
2050
2051        // Test passes if we reach here without panicking
2052    }
2053
2054    #[test]
2055    fn test_subscription_vs_manual_unsubscribe() {
2056        let prop = ObservableProperty::new(0);
2057        let auto_count = Arc::new(AtomicUsize::new(0));
2058        let manual_count = Arc::new(AtomicUsize::new(0));
2059
2060        // Manual subscription
2061        let manual_count_clone = manual_count.clone();
2062        let manual_id = prop
2063            .subscribe(Arc::new(move |_, _| {
2064                manual_count_clone.fetch_add(1, Ordering::SeqCst);
2065            }))
2066            .expect("Failed to create manual subscription");
2067
2068        // Automatic subscription
2069        let auto_count_clone = auto_count.clone();
2070        let _auto_subscription = prop
2071            .subscribe_with_subscription(Arc::new(move |_, _| {
2072                auto_count_clone.fetch_add(1, Ordering::SeqCst);
2073            }))
2074            .expect("Failed to create automatic subscription");
2075
2076        // Both should be active
2077        prop.set(1).expect("Failed to set property value with both subscriptions");
2078        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2079        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2080
2081        // Manual unsubscribe
2082        prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2083
2084        // Only automatic subscription should be active
2085        prop.set(2).expect("Failed to set property value after manual unsubscribe");
2086        assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2087        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2088
2089        // Auto subscription goes out of scope here and cleans up automatically
2090    }
2091
2092    #[test]
2093    fn test_subscribe_with_subscription_error_handling() {
2094        let prop = Arc::new(ObservableProperty::new(0));
2095        let prop_clone = prop.clone();
2096
2097        // Poison the lock
2098        let poison_thread = thread::spawn(move || {
2099            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2100            panic!("Deliberate panic to poison the lock");
2101        });
2102        let _ = poison_thread.join();
2103
2104        // subscribe_with_subscription should return an error for poisoned lock
2105        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2106        assert!(result.is_err());
2107        match result.expect_err("Expected error for poisoned lock") {
2108            PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {
2109                // Either error type is acceptable for poisoned lock
2110            }
2111            other => panic!("Unexpected error type: {:?}", other),
2112        }
2113    }
2114
2115    #[test]
2116    fn test_subscription_with_property_cloning() {
2117        let prop1 = ObservableProperty::new(0);
2118        let prop2 = prop1.clone();
2119        let call_count = Arc::new(AtomicUsize::new(0));
2120
2121        // Subscribe to prop1
2122        let count = call_count.clone();
2123        let _subscription = prop1
2124            .subscribe_with_subscription(Arc::new(move |_, _| {
2125                count.fetch_add(1, Ordering::SeqCst);
2126            }))
2127            .expect("Failed to create subscription for cloned property test");
2128
2129        // Changes through prop2 should trigger the observer subscribed to prop1
2130        prop2.set(42).expect("Failed to set property value through prop2");
2131        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2132
2133        // Changes through prop1 should also trigger the observer
2134        prop1.set(100).expect("Failed to set property value through prop1");
2135        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2136    }
2137
2138    #[test]
2139    fn test_subscription_in_conditional_blocks() {
2140        let prop = ObservableProperty::new(0);
2141        let call_count = Arc::new(AtomicUsize::new(0));
2142
2143        let should_subscribe = true;
2144
2145        if should_subscribe {
2146            let count = call_count.clone();
2147            let _subscription = prop
2148                .subscribe_with_subscription(Arc::new(move |_, _| {
2149                    count.fetch_add(1, Ordering::SeqCst);
2150                }))
2151                .expect("Failed to create subscription in conditional block");
2152
2153            // Observer active within this block
2154            prop.set(1).expect("Failed to set property value in conditional block");
2155            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2156
2157            // Subscription dropped when exiting this block
2158        }
2159
2160        // Observer should be inactive now
2161        prop.set(2).expect("Failed to set property value after conditional block");
2162        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2163    }
2164
2165    #[test]
2166    fn test_subscription_with_early_return() {
2167        fn test_function(
2168            prop: &ObservableProperty<i32>,
2169            should_return_early: bool,
2170        ) -> Result<(), PropertyError> {
2171            let call_count = Arc::new(AtomicUsize::new(0));
2172            let count = call_count.clone();
2173
2174            let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2175                count.fetch_add(1, Ordering::SeqCst);
2176            }))?;
2177
2178            prop.set(1)?;
2179            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2180
2181            if should_return_early {
2182                return Ok(()); // Subscription should be cleaned up here
2183            }
2184
2185            prop.set(2)?;
2186            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2187
2188            Ok(())
2189            // Subscription cleaned up when function exits normally
2190        }
2191
2192        let prop = ObservableProperty::new(0);
2193
2194        // Test early return
2195        test_function(&prop, true).expect("Failed to test early return");
2196
2197        // Verify observer is no longer active after early return
2198        prop.set(10).expect("Failed to set property value after early return");
2199
2200        // Test normal exit
2201        test_function(&prop, false).expect("Failed to test normal exit");
2202
2203        // Verify observer is no longer active after normal exit
2204        prop.set(20).expect("Failed to set property value after normal exit");
2205    }
2206
2207    #[test]
2208    fn test_subscription_move_semantics() {
2209        let prop = ObservableProperty::new(0);
2210        let call_count = Arc::new(AtomicUsize::new(0));
2211
2212        let count = call_count.clone();
2213        let subscription = prop
2214            .subscribe_with_subscription(Arc::new(move |_, _| {
2215                count.fetch_add(1, Ordering::SeqCst);
2216            }))
2217            .expect("Failed to create subscription for move semantics test");
2218
2219        // Observer should be active
2220        prop.set(1).expect("Failed to set property value before move");
2221        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2222
2223        // Move subscription to a new variable
2224        let moved_subscription = subscription;
2225
2226        // Observer should still be active after move
2227        prop.set(2).expect("Failed to set property value after move");
2228        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2229
2230        // Drop the moved subscription
2231        drop(moved_subscription);
2232
2233        // Observer should now be inactive
2234        prop.set(3).expect("Failed to set property value after moved subscription drop");
2235        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2236    }
2237
2238    #[test]
2239    fn test_filtered_subscription_automatic_cleanup() {
2240        let prop = ObservableProperty::new(0);
2241        let call_count = Arc::new(AtomicUsize::new(0));
2242
2243        {
2244            let count = call_count.clone();
2245            let _subscription = prop
2246                .subscribe_filtered_with_subscription(
2247                    Arc::new(move |_, _| {
2248                        count.fetch_add(1, Ordering::SeqCst);
2249                    }),
2250                    |old, new| new > old, // Only trigger on increases
2251                )
2252                .expect("Failed to create filtered subscription");
2253
2254            // Should trigger (0 -> 5)
2255            prop.set(5).expect("Failed to set property value to 5 in filtered test");
2256            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2257
2258            // Should NOT trigger (5 -> 3)
2259            prop.set(3).expect("Failed to set property value to 3 in filtered test");
2260            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2261
2262            // Should trigger (3 -> 10)
2263            prop.set(10).expect("Failed to set property value to 10 in filtered test");
2264            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2265
2266            // Subscription goes out of scope here
2267        }
2268
2269        // Observer should be inactive after subscription cleanup
2270        prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2271        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2272    }
2273
2274    #[test]
2275    fn test_multiple_filtered_subscriptions() {
2276        let prop = ObservableProperty::new(10);
2277        let increase_count = Arc::new(AtomicUsize::new(0));
2278        let decrease_count = Arc::new(AtomicUsize::new(0));
2279        let significant_change_count = Arc::new(AtomicUsize::new(0));
2280
2281        let inc_count = increase_count.clone();
2282        let dec_count = decrease_count.clone();
2283        let sig_count = significant_change_count.clone();
2284
2285        let _increase_sub = prop
2286            .subscribe_filtered_with_subscription(
2287                Arc::new(move |_, _| {
2288                    inc_count.fetch_add(1, Ordering::SeqCst);
2289                }),
2290                |old, new| new > old,
2291            )
2292            .expect("Failed to create increase subscription");
2293
2294        let _decrease_sub = prop
2295            .subscribe_filtered_with_subscription(
2296                Arc::new(move |_, _| {
2297                    dec_count.fetch_add(1, Ordering::SeqCst);
2298                }),
2299                |old, new| new < old,
2300            )
2301            .expect("Failed to create decrease subscription");
2302
2303        let _significant_sub = prop
2304            .subscribe_filtered_with_subscription(
2305                Arc::new(move |_, _| {
2306                    sig_count.fetch_add(1, Ordering::SeqCst);
2307                }),
2308                |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2309            )
2310            .expect("Failed to create significant change subscription");
2311
2312        // Test increases
2313        prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2314        assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2315        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2316        assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2317
2318        // Test significant increase
2319        prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2320        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2321        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2322        assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2323
2324        // Test significant decrease
2325        prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2326        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2327        assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2328        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2329
2330        // Test small decrease
2331        prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2332        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2333        assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2334        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2335
2336        // All subscriptions auto-cleanup when they go out of scope
2337    }
2338
2339    #[test]
2340    fn test_filtered_subscription_complex_filter() {
2341        let prop = ObservableProperty::new(0.0f64);
2342        let call_count = Arc::new(AtomicUsize::new(0));
2343        let values_received = Arc::new(RwLock::new(Vec::new()));
2344
2345        let count = call_count.clone();
2346        let values = values_received.clone();
2347        let _subscription = prop
2348            .subscribe_filtered_with_subscription(
2349                Arc::new(move |old, new| {
2350                    count.fetch_add(1, Ordering::SeqCst);
2351                    if let Ok(mut v) = values.write() {
2352                        v.push((*old, *new));
2353                    }
2354                }),
2355                |old, new| {
2356                    // Complex filter: trigger only when crossing integer boundaries
2357                    // and the change is significant (> 0.5)
2358                    let old_int = old.floor() as i32;
2359                    let new_int = new.floor() as i32;
2360                    old_int != new_int && (new - old).abs() > 0.5_f64
2361                },
2362            )
2363            .expect("Failed to create complex filtered subscription");
2364
2365        // Small changes within same integer - should not trigger
2366        prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2367        prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2368        assert_eq!(call_count.load(Ordering::SeqCst), 0);
2369
2370        // Cross integer boundary with significant change - should trigger
2371        prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2372        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2373
2374        // Small cross-boundary change - should not trigger
2375        prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2376        prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2377        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2378
2379        // Large cross-boundary change - should trigger
2380        prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2381        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2382
2383        // Verify received values
2384        let values = values_received.read().expect("Failed to read values in complex filter test");
2385        assert_eq!(values.len(), 2);
2386        assert_eq!(values[0], (0.7, 1.3));
2387        assert_eq!(values[1], (2.1, 3.5));
2388    }
2389
2390    #[test]
2391    fn test_filtered_subscription_error_handling() {
2392        let prop = Arc::new(ObservableProperty::new(0));
2393        let prop_clone = prop.clone();
2394
2395        // Poison the lock
2396        let poison_thread = thread::spawn(move || {
2397            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2398            panic!("Deliberate panic to poison the lock");
2399        });
2400        let _ = poison_thread.join();
2401
2402        // subscribe_filtered_with_subscription should return error for poisoned lock
2403        let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2404        assert!(result.is_err());
2405    }
2406
2407    #[test]
2408    fn test_filtered_subscription_vs_manual_filtered() {
2409        let prop = ObservableProperty::new(0);
2410        let auto_count = Arc::new(AtomicUsize::new(0));
2411        let manual_count = Arc::new(AtomicUsize::new(0));
2412
2413        // Manual filtered subscription
2414        let manual_count_clone = manual_count.clone();
2415        let manual_id = prop
2416            .subscribe_filtered(
2417                Arc::new(move |_, _| {
2418                    manual_count_clone.fetch_add(1, Ordering::SeqCst);
2419                }),
2420                |old, new| new > old,
2421            )
2422            .expect("Failed to create manual filtered subscription");
2423
2424        // Automatic filtered subscription
2425        let auto_count_clone = auto_count.clone();
2426        let _auto_subscription = prop
2427            .subscribe_filtered_with_subscription(
2428                Arc::new(move |_, _| {
2429                    auto_count_clone.fetch_add(1, Ordering::SeqCst);
2430                }),
2431                |old, new| new > old,
2432            )
2433            .expect("Failed to create automatic filtered subscription");
2434
2435        // Both should be triggered by increases
2436        prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2437        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2438        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2439
2440        // Neither should be triggered by decreases
2441        prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2442        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2443        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2444
2445        // Both should be triggered by increases again
2446        prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2447        assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2448        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2449
2450        // Manual cleanup
2451        prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2452
2453        // Only automatic subscription should be active
2454        prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2455        assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2456        assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2457
2458        // Auto subscription cleaned up when it goes out of scope
2459    }
2460
2461    #[test]
2462    fn test_filtered_subscription_with_panicking_filter() {
2463        let prop = ObservableProperty::new(0);
2464        let call_count = Arc::new(AtomicUsize::new(0));
2465
2466        let count = call_count.clone();
2467        let _subscription = prop
2468            .subscribe_filtered_with_subscription(
2469                Arc::new(move |_, _| {
2470                    count.fetch_add(1, Ordering::SeqCst);
2471                }),
2472                |_, new| {
2473                    if *new == 42 {
2474                        panic!("Filter panic on 42");
2475                    }
2476                    true // Accept all other values
2477                },
2478            )
2479            .expect("Failed to create panicking filter subscription");
2480
2481        // Normal value should work
2482        prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2483        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2484
2485        // Value that causes filter to panic should be handled gracefully
2486        // The behavior here depends on how the filter panic is handled
2487        // In the current implementation, filter panics may cause the observer to not be called
2488        prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2489
2490        // Observer should still work for subsequent normal values
2491        prop.set(2).expect("Failed to set property to 2 after filter panic");
2492        // Note: The exact count here depends on panic handling implementation
2493        // The important thing is that the system doesn't crash
2494    }
2495
2496    #[test]
2497    fn test_subscription_thread_safety() {
2498        let prop = Arc::new(ObservableProperty::new(0));
2499        let num_threads = 8;
2500        let operations_per_thread = 50;
2501        let total_calls = Arc::new(AtomicUsize::new(0));
2502
2503        let handles: Vec<_> = (0..num_threads)
2504            .map(|thread_id| {
2505                let prop_clone = prop.clone();
2506                let calls_clone = total_calls.clone();
2507
2508                thread::spawn(move || {
2509                    let mut local_subscriptions = Vec::new();
2510
2511                    for i in 0..operations_per_thread {
2512                        let calls = calls_clone.clone();
2513                        let subscription = prop_clone
2514                            .subscribe_with_subscription(Arc::new(move |old, new| {
2515                                calls.fetch_add(1, Ordering::SeqCst);
2516                                // Simulate some work
2517                                let _ = thread_id + i + old + new;
2518                            }))
2519                            .expect("Should be able to create subscription");
2520
2521                        local_subscriptions.push(subscription);
2522
2523                        // Trigger observers
2524                        prop_clone
2525                            .set(thread_id * 1000 + i)
2526                            .expect("Should be able to set value");
2527
2528                        // Occasionally drop some subscriptions
2529                        if i % 5 == 0 && !local_subscriptions.is_empty() {
2530                            local_subscriptions.remove(0); // Drop first subscription
2531                        }
2532                    }
2533
2534                    // All remaining subscriptions dropped when vector goes out of scope
2535                })
2536            })
2537            .collect();
2538
2539        // Wait for all threads to complete
2540        for handle in handles {
2541            handle.join().expect("Thread should complete successfully");
2542        }
2543
2544        // Property should still be functional after all the concurrent operations
2545        prop.set(9999).expect("Property should still work");
2546
2547        // We can't easily verify the exact call count due to the complex timing,
2548        // but we can verify that the system didn't crash and is still operational
2549        println!(
2550            "Total observer calls: {}",
2551            total_calls.load(Ordering::SeqCst)
2552        );
2553    }
2554
2555    #[test]
2556    fn test_subscription_cross_thread_drop() {
2557        let prop = Arc::new(ObservableProperty::new(0));
2558        let call_count = Arc::new(AtomicUsize::new(0));
2559
2560        // Create subscription in main thread
2561        let count = call_count.clone();
2562        let subscription = prop
2563            .subscribe_with_subscription(Arc::new(move |_, _| {
2564                count.fetch_add(1, Ordering::SeqCst);
2565            }))
2566            .expect("Failed to create subscription for cross-thread drop test");
2567
2568        // Verify observer is active
2569        prop.set(1).expect("Failed to set property value in cross-thread drop test");
2570        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2571
2572        // Move subscription to another thread and drop it there
2573        let prop_clone = prop.clone();
2574        let call_count_clone = call_count.clone();
2575
2576        let handle = thread::spawn(move || {
2577            // Verify observer is still active in the other thread
2578            prop_clone.set(2).expect("Failed to set property value in other thread");
2579            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
2580
2581            // Drop subscription in this thread
2582            drop(subscription);
2583
2584            // Verify observer is no longer active
2585            prop_clone.set(3).expect("Failed to set property value after drop in other thread");
2586            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
2587        });
2588
2589        handle.join().expect("Failed to join cross-thread drop test thread");
2590
2591        // Verify observer is still inactive in main thread
2592        prop.set(4).expect("Failed to set property value after thread join");
2593        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2594    }
2595
2596    #[test]
2597    fn test_concurrent_subscription_creation_and_property_changes() {
2598        let prop = Arc::new(ObservableProperty::new(0));
2599        let total_notifications = Arc::new(AtomicUsize::new(0));
2600        let num_subscriber_threads = 4;
2601        let num_setter_threads = 2;
2602        let operations_per_thread = 25;
2603
2604        // Threads that create and destroy subscriptions
2605        let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
2606            .map(|_| {
2607                let prop_clone = prop.clone();
2608                let notifications_clone = total_notifications.clone();
2609
2610                thread::spawn(move || {
2611                    for _ in 0..operations_per_thread {
2612                        let notifications = notifications_clone.clone();
2613                        let _subscription = prop_clone
2614                            .subscribe_with_subscription(Arc::new(move |_, _| {
2615                                notifications.fetch_add(1, Ordering::SeqCst);
2616                            }))
2617                            .expect("Should create subscription");
2618
2619                        // Keep subscription alive for a short time
2620                        thread::sleep(Duration::from_millis(1));
2621
2622                        // Subscription dropped when _subscription goes out of scope
2623                    }
2624                })
2625            })
2626            .collect();
2627
2628        // Threads that continuously change the property value
2629        let setter_handles: Vec<_> = (0..num_setter_threads)
2630            .map(|thread_id| {
2631                let prop_clone = prop.clone();
2632
2633                thread::spawn(move || {
2634                    for i in 0..operations_per_thread * 2 {
2635                        prop_clone
2636                            .set(thread_id * 10000 + i)
2637                            .expect("Should set value");
2638                        thread::sleep(Duration::from_millis(1));
2639                    }
2640                })
2641            })
2642            .collect();
2643
2644        // Wait for all threads to complete
2645        for handle in subscriber_handles
2646            .into_iter()
2647            .chain(setter_handles.into_iter())
2648        {
2649            handle.join().expect("Thread should complete");
2650        }
2651
2652        // System should be stable after concurrent operations
2653        prop.set(99999).expect("Property should still work");
2654
2655        println!(
2656            "Total notifications during concurrent test: {}",
2657            total_notifications.load(Ordering::SeqCst)
2658        );
2659    }
2660
2661    #[test]
2662    fn test_filtered_subscription_thread_safety() {
2663        let prop = Arc::new(ObservableProperty::new(0));
2664        let increase_notifications = Arc::new(AtomicUsize::new(0));
2665        let decrease_notifications = Arc::new(AtomicUsize::new(0));
2666        let num_threads = 6;
2667
2668        let handles: Vec<_> = (0..num_threads)
2669            .map(|thread_id| {
2670                let prop_clone = prop.clone();
2671                let inc_notifications = increase_notifications.clone();
2672                let dec_notifications = decrease_notifications.clone();
2673
2674                thread::spawn(move || {
2675                    // Create increase-only subscription
2676                    let inc_count = inc_notifications.clone();
2677                    let _inc_subscription = prop_clone
2678                        .subscribe_filtered_with_subscription(
2679                            Arc::new(move |_, _| {
2680                                inc_count.fetch_add(1, Ordering::SeqCst);
2681                            }),
2682                            |old, new| new > old,
2683                        )
2684                        .expect("Should create filtered subscription");
2685
2686                    // Create decrease-only subscription
2687                    let dec_count = dec_notifications.clone();
2688                    let _dec_subscription = prop_clone
2689                        .subscribe_filtered_with_subscription(
2690                            Arc::new(move |_, _| {
2691                                dec_count.fetch_add(1, Ordering::SeqCst);
2692                            }),
2693                            |old, new| new < old,
2694                        )
2695                        .expect("Should create filtered subscription");
2696
2697                    // Perform some property changes
2698                    let base_value = thread_id * 100;
2699                    for i in 0..20 {
2700                        let new_value = base_value + (i % 10); // Creates increases and decreases
2701                        prop_clone.set(new_value).expect("Should set value");
2702                        thread::sleep(Duration::from_millis(1));
2703                    }
2704
2705                    // Subscriptions automatically cleaned up when going out of scope
2706                })
2707            })
2708            .collect();
2709
2710        // Wait for all threads
2711        for handle in handles {
2712            handle.join().expect("Thread should complete");
2713        }
2714
2715        // Verify system is still operational
2716        let initial_inc = increase_notifications.load(Ordering::SeqCst);
2717        let initial_dec = decrease_notifications.load(Ordering::SeqCst);
2718
2719        prop.set(1000).expect("Property should still work");
2720        prop.set(2000).expect("Property should still work");
2721
2722        // No new notifications should occur (all subscriptions cleaned up)
2723        assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
2724        assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
2725
2726        println!(
2727            "Increase notifications: {}, Decrease notifications: {}",
2728            initial_inc, initial_dec
2729        );
2730    }
2731
2732    #[test]
2733    fn test_subscription_with_async_property_changes() {
2734        let prop = Arc::new(ObservableProperty::new(0));
2735        let sync_notifications = Arc::new(AtomicUsize::new(0));
2736        let async_notifications = Arc::new(AtomicUsize::new(0));
2737
2738        // Subscription that tracks sync notifications
2739        let sync_count = sync_notifications.clone();
2740        let _sync_subscription = prop
2741            .subscribe_with_subscription(Arc::new(move |old, new| {
2742                sync_count.fetch_add(1, Ordering::SeqCst);
2743                // Simulate slow observer work
2744                thread::sleep(Duration::from_millis(5));
2745                println!("Sync observer: {} -> {}", old, new);
2746            }))
2747            .expect("Failed to create sync subscription");
2748
2749        // Subscription that tracks async notifications
2750        let async_count = async_notifications.clone();
2751        let _async_subscription = prop
2752            .subscribe_with_subscription(Arc::new(move |old, new| {
2753                async_count.fetch_add(1, Ordering::SeqCst);
2754                println!("Async observer: {} -> {}", old, new);
2755            }))
2756            .expect("Failed to create async subscription");
2757
2758        // Test sync property changes
2759        let start = std::time::Instant::now();
2760        prop.set(1).expect("Failed to set property value 1 in async test");
2761        prop.set(2).expect("Failed to set property value 2 in async test");
2762        let sync_duration = start.elapsed();
2763
2764        // Test async property changes
2765        let start = std::time::Instant::now();
2766        prop.set_async(3).expect("Failed to set property value 3 async");
2767        prop.set_async(4).expect("Failed to set property value 4 async");
2768        let async_duration = start.elapsed();
2769
2770        // Async should be much faster
2771        assert!(async_duration < sync_duration);
2772
2773        // Wait for async observers to complete
2774        thread::sleep(Duration::from_millis(50));
2775
2776        // All observers should have been called
2777        assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
2778        assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
2779
2780        // Subscriptions auto-cleanup when going out of scope
2781    }
2782
2783    #[test]
2784    fn test_subscription_creation_with_poisoned_lock() {
2785        let prop = Arc::new(ObservableProperty::new(0));
2786        let prop_clone = prop.clone();
2787
2788        // Create a valid subscription before poisoning
2789        let call_count = Arc::new(AtomicUsize::new(0));
2790        let count = call_count.clone();
2791        let existing_subscription = prop
2792            .subscribe_with_subscription(Arc::new(move |_, _| {
2793                count.fetch_add(1, Ordering::SeqCst);
2794            }))
2795            .expect("Failed to create subscription before poisoning");
2796
2797        // Poison the lock
2798        let poison_thread = thread::spawn(move || {
2799            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
2800            panic!("Deliberate panic to poison the lock");
2801        });
2802        let _ = poison_thread.join();
2803
2804        // New subscription creation should fail
2805        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2806        assert!(result.is_err());
2807
2808        // New filtered subscription creation should also fail
2809        let filtered_result =
2810            prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2811        assert!(filtered_result.is_err());
2812
2813        // Dropping existing subscription should not panic
2814        drop(existing_subscription);
2815    }
2816
2817    #[test]
2818    fn test_subscription_cleanup_behavior_with_poisoned_lock() {
2819        // This test specifically verifies that Drop doesn't panic with poisoned locks
2820        let prop = Arc::new(ObservableProperty::new(0));
2821        let call_count = Arc::new(AtomicUsize::new(0));
2822
2823        // Create subscription
2824        let count = call_count.clone();
2825        let subscription = prop
2826            .subscribe_with_subscription(Arc::new(move |_, _| {
2827                count.fetch_add(1, Ordering::SeqCst);
2828            }))
2829            .expect("Failed to create subscription for cleanup behavior test");
2830
2831        // Verify it works initially
2832        prop.set(1).expect("Failed to set property value in cleanup behavior test");
2833        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2834
2835        // Poison the lock from another thread
2836        let prop_clone = prop.clone();
2837        let poison_thread = thread::spawn(move || {
2838            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
2839            panic!("Deliberate panic to poison the lock");
2840        });
2841        let _ = poison_thread.join();
2842
2843        // Now drop the subscription - this should NOT panic
2844        // The Drop implementation should handle the poisoned lock gracefully
2845        drop(subscription);
2846
2847        // Test succeeds if we reach this point without panicking
2848    }
2849
2850    #[test]
2851    fn test_multiple_subscription_cleanup_with_poisoned_lock() {
2852        let prop = Arc::new(ObservableProperty::new(0));
2853        let mut subscriptions = Vec::new();
2854
2855        // Create multiple subscriptions
2856        for i in 0..5 {
2857            let call_count = Arc::new(AtomicUsize::new(0));
2858            let count = call_count.clone();
2859            let subscription = prop
2860                .subscribe_with_subscription(Arc::new(move |old, new| {
2861                    count.fetch_add(1, Ordering::SeqCst);
2862                    println!("Observer {}: {} -> {}", i, old, new);
2863                }))
2864                .expect("Failed to create subscription in multiple cleanup test");
2865            subscriptions.push(subscription);
2866        }
2867
2868        // Verify they all work
2869        prop.set(42).expect("Failed to set property value in multiple cleanup test");
2870
2871        // Poison the lock
2872        let prop_clone = prop.clone();
2873        let poison_thread = thread::spawn(move || {
2874            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
2875            panic!("Deliberate panic to poison the lock");
2876        });
2877        let _ = poison_thread.join();
2878
2879        // Drop all subscriptions - none should panic
2880        for subscription in subscriptions {
2881            drop(subscription);
2882        }
2883
2884        // Test succeeds if no panics occurred
2885    }
2886
2887    #[test]
2888    fn test_subscription_behavior_before_and_after_poison() {
2889        let prop = Arc::new(ObservableProperty::new(0));
2890        let before_poison_count = Arc::new(AtomicUsize::new(0));
2891        let after_poison_count = Arc::new(AtomicUsize::new(0));
2892
2893        // Create subscription before poisoning
2894        let before_count = before_poison_count.clone();
2895        let before_subscription = prop
2896            .subscribe_with_subscription(Arc::new(move |_, _| {
2897                before_count.fetch_add(1, Ordering::SeqCst);
2898            }))
2899            .expect("Failed to create subscription before poison test");
2900
2901        // Verify it works
2902        prop.set(1).expect("Failed to set property value before poison test");
2903        assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
2904
2905        // Poison the lock
2906        let prop_clone = prop.clone();
2907        let poison_thread = thread::spawn(move || {
2908            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
2909            panic!("Deliberate panic to poison the lock");
2910        });
2911        let _ = poison_thread.join();
2912
2913        // Try to create subscription after poisoning - should fail
2914        let after_count = after_poison_count.clone();
2915        let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2916            after_count.fetch_add(1, Ordering::SeqCst);
2917        }));
2918        assert!(after_result.is_err());
2919
2920        // Clean up the before-poison subscription - should not panic
2921        drop(before_subscription);
2922
2923        // Verify after-poison subscription was never created
2924        assert_eq!(after_poison_count.load(Ordering::SeqCst), 0);
2925    }
2926
2927    #[test]
2928    fn test_concurrent_subscription_drops_with_poison() {
2929        let prop = Arc::new(ObservableProperty::new(0));
2930        let num_subscriptions = 10;
2931        let mut subscriptions = Vec::new();
2932
2933        // Create multiple subscriptions
2934        for i in 0..num_subscriptions {
2935            let call_count = Arc::new(AtomicUsize::new(0));
2936            let count = call_count.clone();
2937            let subscription = prop
2938                .subscribe_with_subscription(Arc::new(move |_, _| {
2939                    count.fetch_add(1, Ordering::SeqCst);
2940                    println!("Observer {}", i);
2941                }))
2942                .expect("Failed to create subscription in concurrent drops test");
2943            subscriptions.push(subscription);
2944        }
2945
2946        // Poison the lock
2947        let prop_clone = prop.clone();
2948        let poison_thread = thread::spawn(move || {
2949            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
2950            panic!("Deliberate panic to poison the lock");
2951        });
2952        let _ = poison_thread.join();
2953
2954        // Drop subscriptions concurrently from multiple threads
2955        let handles: Vec<_> = subscriptions
2956            .into_iter()
2957            .enumerate()
2958            .map(|(i, subscription)| {
2959                thread::spawn(move || {
2960                    // Add some randomness to timing
2961                    thread::sleep(Duration::from_millis(i as u64 % 5));
2962                    drop(subscription);
2963                    println!("Dropped subscription {}", i);
2964                })
2965            })
2966            .collect();
2967
2968        // Wait for all drops to complete
2969        for handle in handles {
2970            handle
2971                .join()
2972                .expect("Drop thread should complete without panic");
2973        }
2974
2975        // Test succeeds if all threads completed successfully
2976    }
2977
2978    #[test]
2979    fn test_with_max_threads_creation() {
2980        // Test creation with various thread counts
2981        let prop1 = ObservableProperty::with_max_threads(42, 1);
2982        let prop2 = ObservableProperty::with_max_threads("test".to_string(), 8);
2983        let prop3 = ObservableProperty::with_max_threads(0.5_f64, 16);
2984
2985        // Verify initial values are correct
2986        assert_eq!(prop1.get().expect("Failed to get prop1 value"), 42);
2987        assert_eq!(prop2.get().expect("Failed to get prop2 value"), "test");
2988        assert_eq!(prop3.get().expect("Failed to get prop3 value"), 0.5);
2989    }
2990
2991    #[test]
2992    fn test_with_max_threads_zero_defaults_to_max_threads() {
2993        // Test that zero max_threads defaults to MAX_THREADS (4)
2994        let prop1 = ObservableProperty::with_max_threads(100, 0);
2995        let prop2 = ObservableProperty::new(100); // Uses default MAX_THREADS
2996
2997        // Both should have the same max_threads value
2998        // We can't directly access max_threads, but we can verify behavior is consistent
2999        assert_eq!(prop1.get().expect("Failed to get prop1 value"), 100);
3000        assert_eq!(prop2.get().expect("Failed to get prop2 value"), 100);
3001    }
3002
3003    #[test]
3004    fn test_with_max_threads_basic_functionality() {
3005        let prop = ObservableProperty::with_max_threads(0, 2);
3006        let call_count = Arc::new(AtomicUsize::new(0));
3007
3008        // Subscribe an observer
3009        let count = call_count.clone();
3010        let _subscription = prop
3011            .subscribe_with_subscription(Arc::new(move |old, new| {
3012                count.fetch_add(1, Ordering::SeqCst);
3013                assert_eq!(*old, 0);
3014                assert_eq!(*new, 42);
3015            }))
3016            .expect("Failed to create subscription for max_threads test");
3017
3018        // Test synchronous set
3019        prop.set(42).expect("Failed to set value synchronously");
3020        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3021
3022        // Test asynchronous set
3023        prop.set_async(43).expect("Failed to set value asynchronously");
3024        
3025        // Wait for async observers to complete
3026        thread::sleep(Duration::from_millis(50));
3027        assert_eq!(call_count.load(Ordering::SeqCst), 2);
3028    }
3029
3030    #[test]
3031    fn test_with_max_threads_async_performance() {
3032        // Test that with_max_threads affects async performance
3033        let prop = ObservableProperty::with_max_threads(0, 1); // Single thread
3034        let slow_call_count = Arc::new(AtomicUsize::new(0));
3035
3036        // Add multiple slow observers
3037        let mut subscriptions = Vec::new();
3038        for _ in 0..4 {
3039            let count = slow_call_count.clone();
3040            let subscription = prop
3041                .subscribe_with_subscription(Arc::new(move |_, _| {
3042                    thread::sleep(Duration::from_millis(25)); // Simulate slow work
3043                    count.fetch_add(1, Ordering::SeqCst);
3044                }))
3045                .expect("Failed to create slow observer subscription");
3046            subscriptions.push(subscription);
3047        }
3048
3049        // Measure time for async notification
3050        let start = std::time::Instant::now();
3051        prop.set_async(42).expect("Failed to set value asynchronously");
3052        let async_duration = start.elapsed();
3053
3054        // Should return quickly even with slow observers
3055        assert!(async_duration.as_millis() < 50, "Async set should return quickly");
3056
3057        // Wait for all observers to complete
3058        thread::sleep(Duration::from_millis(200));
3059        assert_eq!(slow_call_count.load(Ordering::SeqCst), 4);
3060    }
3061
3062    #[test]
3063    fn test_with_max_threads_vs_regular_constructor() {
3064        let prop_regular = ObservableProperty::new(42);
3065        let prop_custom = ObservableProperty::with_max_threads(42, 4); // Same as default
3066
3067        let count_regular = Arc::new(AtomicUsize::new(0));
3068        let count_custom = Arc::new(AtomicUsize::new(0));
3069
3070        // Both should behave identically
3071        let count1 = count_regular.clone();
3072        let _sub1 = prop_regular
3073            .subscribe_with_subscription(Arc::new(move |_, _| {
3074                count1.fetch_add(1, Ordering::SeqCst);
3075            }))
3076            .expect("Failed to create regular subscription");
3077
3078        let count2 = count_custom.clone();
3079        let _sub2 = prop_custom
3080            .subscribe_with_subscription(Arc::new(move |_, _| {
3081                count2.fetch_add(1, Ordering::SeqCst);
3082            }))
3083            .expect("Failed to create custom subscription");
3084
3085        // Test sync behavior
3086        prop_regular.set(100).expect("Failed to set regular property");
3087        prop_custom.set(100).expect("Failed to set custom property");
3088
3089        assert_eq!(count_regular.load(Ordering::SeqCst), 1);
3090        assert_eq!(count_custom.load(Ordering::SeqCst), 1);
3091
3092        // Test async behavior
3093        prop_regular.set_async(200).expect("Failed to set regular property async");
3094        prop_custom.set_async(200).expect("Failed to set custom property async");
3095
3096        thread::sleep(Duration::from_millis(50));
3097        assert_eq!(count_regular.load(Ordering::SeqCst), 2);
3098        assert_eq!(count_custom.load(Ordering::SeqCst), 2);
3099    }
3100
3101    #[test]
3102    fn test_with_max_threads_large_values() {
3103        // Test with very large max_threads values
3104        let prop = ObservableProperty::with_max_threads(0, 1000);
3105        let call_count = Arc::new(AtomicUsize::new(0));
3106
3107        // Add a few observers
3108        let count = call_count.clone();
3109        let _subscription = prop
3110            .subscribe_with_subscription(Arc::new(move |_, _| {
3111                count.fetch_add(1, Ordering::SeqCst);
3112            }))
3113            .expect("Failed to create subscription for large max_threads test");
3114
3115        // Should work normally even with excessive thread limit
3116        prop.set_async(42).expect("Failed to set value with large max_threads");
3117
3118        thread::sleep(Duration::from_millis(50));
3119        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3120    }
3121
3122    #[test]
3123    fn test_with_max_threads_clone_behavior() {
3124        let prop1 = ObservableProperty::with_max_threads(42, 2);
3125        let prop2 = prop1.clone();
3126
3127        let call_count1 = Arc::new(AtomicUsize::new(0));
3128        let call_count2 = Arc::new(AtomicUsize::new(0));
3129
3130        // Subscribe to both properties
3131        let count1 = call_count1.clone();
3132        let _sub1 = prop1
3133            .subscribe_with_subscription(Arc::new(move |_, _| {
3134                count1.fetch_add(1, Ordering::SeqCst);
3135            }))
3136            .expect("Failed to create subscription for cloned property test");
3137
3138        let count2 = call_count2.clone();
3139        let _sub2 = prop2
3140            .subscribe_with_subscription(Arc::new(move |_, _| {
3141                count2.fetch_add(1, Ordering::SeqCst);
3142            }))
3143            .expect("Failed to create subscription for original property test");
3144
3145        // Changes through either property should trigger both observers
3146        prop1.set_async(100).expect("Failed to set value through prop1");
3147        thread::sleep(Duration::from_millis(50));
3148        
3149        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
3150        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
3151
3152        prop2.set_async(200).expect("Failed to set value through prop2");
3153        thread::sleep(Duration::from_millis(50));
3154        
3155        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
3156        assert_eq!(call_count2.load(Ordering::SeqCst), 2);
3157    }
3158
3159    #[test]
3160    fn test_with_max_threads_thread_safety() {
3161        let prop = Arc::new(ObservableProperty::with_max_threads(0, 3));
3162        let call_count = Arc::new(AtomicUsize::new(0));
3163
3164        // Add observers from multiple threads
3165        let handles: Vec<_> = (0..5)
3166            .map(|thread_id| {
3167                let prop_clone = prop.clone();
3168                let count_clone = call_count.clone();
3169
3170                thread::spawn(move || {
3171                    let count = count_clone.clone();
3172                    let _subscription = prop_clone
3173                        .subscribe_with_subscription(Arc::new(move |_, _| {
3174                            count.fetch_add(1, Ordering::SeqCst);
3175                        }))
3176                        .expect("Failed to create thread-safe subscription");
3177
3178                    // Trigger async notifications from this thread
3179                    prop_clone
3180                        .set_async(thread_id * 10)
3181                        .expect("Failed to set value from thread");
3182                        
3183                    thread::sleep(Duration::from_millis(10));
3184                })
3185            })
3186            .collect();
3187
3188        // Wait for all threads to complete
3189        for handle in handles {
3190            handle.join().expect("Thread should complete successfully");
3191        }
3192
3193        // Wait for all async operations to complete
3194        thread::sleep(Duration::from_millis(100));
3195
3196        // Each set_async should trigger all active observers at that time
3197        // The exact count depends on timing, but should be > 0
3198        assert!(call_count.load(Ordering::SeqCst) > 0);
3199    }
3200
3201    #[test]
3202    fn test_with_max_threads_error_handling() {
3203        let prop = ObservableProperty::with_max_threads(42, 2);
3204        
3205        // Test that error handling works the same as regular properties
3206        let _subscription = prop
3207            .subscribe_with_subscription(Arc::new(|_, _| {
3208                // Normal observer
3209            }))
3210            .expect("Failed to create subscription for error handling test");
3211
3212        // Should handle errors gracefully
3213        assert!(prop.set(100).is_ok());
3214        assert!(prop.set_async(200).is_ok());
3215        assert_eq!(prop.get().expect("Failed to get value after error test"), 200);
3216    }
3217}