observable_property/
lib.rs

1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Graceful lock recovery**: Continues operation even after lock poisoning from panics
15//! - **Memory protection**: Observer limit (10,000) prevents memory exhaustion
16//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
17//!
18//! ## Quick Start
19//!
20//! ```rust
21//! use observable_property::ObservableProperty;
22//! use std::sync::Arc;
23//!
24//! // Create an observable property
25//! let property = ObservableProperty::new(42);
26//!
27//! // Subscribe to changes
28//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
29//!     println!("Value changed from {} to {}", old_value, new_value);
30//! })).map_err(|e| {
31//!     eprintln!("Failed to subscribe: {}", e);
32//!     e
33//! })?;
34//!
35//! // Change the value (triggers observer)
36//! property.set(100).map_err(|e| {
37//!     eprintln!("Failed to set value: {}", e);
38//!     e
39//! })?;
40//!
41//! // Unsubscribe when done
42//! property.unsubscribe(observer_id).map_err(|e| {
43//!     eprintln!("Failed to unsubscribe: {}", e);
44//!     e
45//! })?;
46//! # Ok::<(), observable_property::PropertyError>(())
47//! ```
48//!
49//! ## Multi-threading Example
50//!
51//! ```rust
52//! use observable_property::ObservableProperty;
53//! use std::sync::Arc;
54//! use std::thread;
55//!
56//! let property = Arc::new(ObservableProperty::new(0));
57//! let property_clone = property.clone();
58//!
59//! // Subscribe from one thread
60//! property.subscribe(Arc::new(|old, new| {
61//!     println!("Value changed: {} -> {}", old, new);
62//! })).map_err(|e| {
63//!     eprintln!("Failed to subscribe: {}", e);
64//!     e
65//! })?;
66//!
67//! // Modify from another thread
68//! thread::spawn(move || {
69//!     if let Err(e) = property_clone.set(42) {
70//!         eprintln!("Failed to set value: {}", e);
71//!     }
72//! }).join().expect("Thread panicked");
73//! # Ok::<(), observable_property::PropertyError>(())
74//! ```
75//!
76//! ## RAII Subscriptions (Recommended)
77//!
78//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
79//!
80//! ```rust
81//! use observable_property::ObservableProperty;
82//! use std::sync::Arc;
83//!
84//! # fn main() -> Result<(), observable_property::PropertyError> {
85//! let property = ObservableProperty::new(0);
86//!
87//! {
88//!     // Create RAII subscription - automatically cleaned up when dropped
89//!     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
90//!         println!("Value changed: {} -> {}", old, new);
91//!     }))?;
92//!
93//!     property.set(42)?; // Prints: "Value changed: 0 -> 42"
94//!
95//!     // Subscription automatically unsubscribes when leaving this scope
96//! }
97//!
98//! // No observer active anymore
99//! property.set(100)?; // No output
100//! # Ok(())
101//! # }
102//! ```
103//!
104//! ## Filtered RAII Subscriptions
105//!
106//! Combine filtering with automatic cleanup for conditional monitoring:
107//!
108//! ```rust
109//! use observable_property::ObservableProperty;
110//! use std::sync::Arc;
111//!
112//! # fn main() -> Result<(), observable_property::PropertyError> {
113//! let temperature = ObservableProperty::new(20.0);
114//!
115//! {
116//!     // Monitor only significant temperature increases with automatic cleanup
117//!     let _heat_warning = temperature.subscribe_filtered_with_subscription(
118//!         Arc::new(|old, new| {
119//!             println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
120//!         }),
121//!         |old, new| new > old && (new - old) > 5.0
122//!     )?;
123//!
124//!     temperature.set(22.0)?; // No warning (only 2°C increase)
125//!     temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
126//!
127//!     // Subscription automatically cleaned up here
128//! }
129//!
130//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
131//! # Ok(())
132//! # }
133//! ```
134//!
135//! ## Subscription Management Comparison
136//!
137//! ```rust
138//! use observable_property::ObservableProperty;
139//! use std::sync::Arc;
140//!
141//! # fn main() -> Result<(), observable_property::PropertyError> {
142//! let property = ObservableProperty::new(0);
143//! let observer = Arc::new(|old: &i32, new: &i32| {
144//!     println!("Value: {} -> {}", old, new);
145//! });
146//!
147//! // Method 1: Manual subscription management
148//! let observer_id = property.subscribe(observer.clone())?;
149//! property.set(42)?;
150//! property.unsubscribe(observer_id)?; // Manual cleanup required
151//!
152//! // Method 2: RAII subscription management (recommended)
153//! {
154//!     let _subscription = property.subscribe_with_subscription(observer)?;
155//!     property.set(100)?;
156//!     // Automatic cleanup when _subscription goes out of scope
157//! }
158//! # Ok(())
159//! # }
160//! ```
161//!
162//! ## Advanced RAII Patterns
163//!
164//! Comprehensive example showing various RAII subscription patterns:
165//!
166//! ```rust
167//! use observable_property::ObservableProperty;
168//! use std::sync::Arc;
169//!
170//! # fn main() -> Result<(), observable_property::PropertyError> {
171//! // System monitoring example
172//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
173//! let memory_usage = ObservableProperty::new(1024); // MB
174//! let active_connections = ObservableProperty::new(0u32);
175//!
176//! // Conditional monitoring based on system state
177//! let high_load_monitoring = cpu_usage.get()? > 50.0;
178//!
179//! if high_load_monitoring {
180//!     // Critical system monitoring - active only during high load
181//!     let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
182//!         Arc::new(|old, new| {
183//!             println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
184//!         }),
185//!         |_, new| *new > 80.0
186//!     )?;
187//!
188//!     let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
189//!         Arc::new(|old, new| {
190//!             println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
191//!         }),
192//!         |_, new| *new > 8192 // > 8GB
193//!     )?;
194//!
195//!     // Simulate system load changes
196//!     cpu_usage.set(85.0)?;     // Would trigger critical alert
197//!     memory_usage.set(9216)?;  // Would trigger memory warning
198//!     
199//!     // All monitoring automatically stops when exiting this block
200//! }
201//!
202//! // Connection monitoring with scoped lifetime
203//! {
204//!     let _connection_monitor = active_connections.subscribe_with_subscription(
205//!         Arc::new(|old, new| {
206//!             if new > old {
207//!                 println!("📈 New connections: {} -> {}", old, new);
208//!             } else if new < old {
209//!                 println!("📉 Connections closed: {} -> {}", old, new);
210//!             }
211//!         })
212//!     )?;
213//!
214//!     active_connections.set(5)?;  // Prints: "📈 New connections: 0 -> 5"
215//!     active_connections.set(3)?;  // Prints: "📉 Connections closed: 5 -> 3"
216//!     active_connections.set(8)?;  // Prints: "📈 New connections: 3 -> 8"
217//!
218//!     // Connection monitoring automatically stops here
219//! }
220//!
221//! // No monitoring active anymore
222//! cpu_usage.set(95.0)?;         // No output
223//! memory_usage.set(10240)?;     // No output  
224//! active_connections.set(15)?;  // No output
225//!
226//! println!("All monitoring automatically cleaned up!");
227//! # Ok(())
228//! # }
229//! ```
230
231use std::collections::HashMap;
232use std::fmt;
233use std::panic;
234use std::sync::{Arc, RwLock};
235use std::thread;
236
237/// Maximum number of background threads used for asynchronous observer notifications
238///
239/// This constant controls the degree of parallelism when using `set_async()` to notify
240/// observers. The observer list is divided into batches, with each batch running in
241/// its own background thread, up to this maximum number of threads.
242///
243/// # Rationale
244///
245/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
246/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead  
247/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
248/// - **System Responsiveness**: Limits thread contention on multi-core systems
249///
250/// # Implementation Details
251///
252/// When `set_async()` is called:
253/// 1. All observers are collected into a snapshot
254/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
255/// 3. Each batch executes in its own `thread::spawn()` call
256/// 4. Observers within each batch are executed sequentially
257///
258/// For example, with 100 observers and `MAX_THREADS = 4`:
259/// - Batch 1: Observers 1-25 (Thread 1)
260/// - Batch 2: Observers 26-50 (Thread 2)  
261/// - Batch 3: Observers 51-75 (Thread 3)
262/// - Batch 4: Observers 76-100 (Thread 4)
263///
264/// # Tuning Considerations
265///
266/// This value can be adjusted based on your application's needs:
267/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
268/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
269/// - **Memory-constrained systems**: Lower values reduce thread overhead
270/// - **Real-time systems**: Lower values reduce scheduling unpredictability
271///
272/// # Thread Safety
273///
274/// This constant is used only during the batching calculation and does not affect
275/// the thread safety of the overall system.
276const MAX_THREADS: usize = 4;
277
278/// Maximum number of observers allowed per property instance
279///
280/// This limit prevents memory exhaustion from unbounded observer registration.
281/// Once this limit is reached, attempts to add more observers will fail with
282/// an `InvalidConfiguration` error.
283///
284/// # Rationale
285///
286/// - **Memory Protection**: Prevents unbounded memory growth from observer accumulation
287/// - **Resource Management**: Ensures predictable memory usage in long-running applications
288/// - **Early Detection**: Catches potential memory leaks from forgotten unsubscriptions
289/// - **System Stability**: Prevents out-of-memory conditions in constrained environments
290///
291/// # Tuning Considerations
292///
293/// This value can be adjusted based on your application's needs:
294/// - **High-frequency properties**: May need fewer observers to avoid notification overhead
295/// - **Low-frequency properties**: Can safely support more observers
296/// - **Memory-constrained systems**: Lower values prevent memory pressure
297/// - **Development/testing**: Higher values may be useful for comprehensive test coverage
298///
299/// # Default Value
300///
301/// The default of 10,000 observers provides generous headroom for most applications while
302/// still preventing pathological cases. In practice, most properties have fewer than 100
303/// observers.
304///
305/// # Example Scenarios
306///
307/// - **User sessions**: 1,000 concurrent users, each with 5 properties = ~5,000 observers
308/// - **IoT devices**: 5,000 devices, each with 1-2 observers = ~10,000 observers
309/// - **Monitoring system**: 100 metrics, each with 20 dashboard widgets = ~2,000 observers
310const MAX_OBSERVERS: usize = 10_000;
311
312/// Errors that can occur when working with ObservableProperty
313///
314/// # Note on Lock Poisoning
315///
316/// This implementation uses **graceful degradation** for poisoned locks. When a lock
317/// is poisoned (typically due to a panic in an observer or another thread), the
318/// library automatically recovers the inner value using [`PoisonError::into_inner()`](std::sync::PoisonError::into_inner).
319///
320/// This means:
321/// - **All operations continue to work** even after a lock is poisoned
322/// - No `ReadLockError`, `WriteLockError`, or `PoisonedLock` errors will occur in practice
323/// - The system remains operational and observers continue to function
324///
325/// The error variants are kept for backward compatibility and potential future use cases,
326/// but with the current implementation, poisoned locks are transparent to users.
327///
328/// # Production Benefit
329///
330/// This approach ensures maximum availability and resilience in production systems where
331/// a misbehaving observer shouldn't bring down the entire property system.
332#[derive(Debug, Clone)]
333pub enum PropertyError {
334    /// Failed to acquire a read lock on the property
335    ///
336    /// **Note**: With graceful degradation, this error is unlikely to occur in practice
337    /// as poisoned locks are automatically recovered.
338    ReadLockError {
339        /// Context describing what operation was being attempted
340        context: String,
341    },
342    /// Failed to acquire a write lock on the property
343    ///
344    /// **Note**: With graceful degradation, this error is unlikely to occur in practice
345    /// as poisoned locks are automatically recovered.
346    WriteLockError {
347        /// Context describing what operation was being attempted
348        context: String,
349    },
350    /// Attempted to unsubscribe an observer that doesn't exist
351    ObserverNotFound {
352        /// The ID of the observer that wasn't found
353        id: usize,
354    },
355    /// The property's lock has been poisoned due to a panic in another thread
356    ///
357    /// **Note**: With graceful degradation, this error will not occur in practice
358    /// as the implementation automatically recovers from poisoned locks.
359    PoisonedLock,
360    /// An observer function encountered an error during execution
361    ObserverError {
362        /// Description of what went wrong
363        reason: String,
364    },
365    /// The thread pool for async notifications is exhausted
366    ThreadPoolExhausted,
367    /// Invalid configuration was provided
368    InvalidConfiguration {
369        /// Description of the invalid configuration
370        reason: String,
371    },
372}
373
374impl fmt::Display for PropertyError {
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        match self {
377            PropertyError::ReadLockError { context } => {
378                write!(f, "Failed to acquire read lock: {}", context)
379            }
380            PropertyError::WriteLockError { context } => {
381                write!(f, "Failed to acquire write lock: {}", context)
382            }
383            PropertyError::ObserverNotFound { id } => {
384                write!(f, "Observer with ID {} not found", id)
385            }
386            PropertyError::PoisonedLock => {
387                write!(
388                    f,
389                    "Property is in a poisoned state due to a panic in another thread"
390                )
391            }
392            PropertyError::ObserverError { reason } => {
393                write!(f, "Observer execution failed: {}", reason)
394            }
395            PropertyError::ThreadPoolExhausted => {
396                write!(f, "Thread pool is exhausted and cannot spawn more observers")
397            }
398            PropertyError::InvalidConfiguration { reason } => {
399                write!(f, "Invalid configuration: {}", reason)
400            }
401        }
402    }
403}
404
405impl std::error::Error for PropertyError {}
406
407/// Function type for observers that get called when property values change
408pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
409
410/// Unique identifier for registered observers
411pub type ObserverId = usize;
412
413/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
414///
415/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
416/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
417/// `Drop` implementation automatically removes the associated observer from the property.
418///
419/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
420/// leaks in scenarios where observers might otherwise be forgotten.
421///
422/// # Type Requirements
423///
424/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
425/// - `Clone`: Required for observer notifications
426/// - `Send`: Required for transferring between threads  
427/// - `Sync`: Required for concurrent access from multiple threads
428/// - `'static`: Required for observer callbacks that may outlive the original scope
429///
430/// # Examples
431///
432/// ## Basic RAII Subscription
433///
434/// ```rust
435/// use observable_property::ObservableProperty;
436/// use std::sync::Arc;
437///
438/// # fn main() -> Result<(), observable_property::PropertyError> {
439/// let property = ObservableProperty::new(0);
440///
441/// {
442///     // Create subscription - observer is automatically registered
443///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
444///         println!("Value changed: {} -> {}", old, new);
445///     }))?;
446///
447///     property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
448///
449///     // When _subscription goes out of scope here, observer is automatically removed
450/// }
451///
452/// property.set(100)?; // No observer output - subscription was automatically cleaned up
453/// # Ok(())
454/// # }
455/// ```
456///
457/// ## Cross-Thread Subscription Management
458///
459/// ```rust
460/// use observable_property::ObservableProperty;
461/// use std::sync::Arc;
462/// use std::thread;
463///
464/// # fn main() -> Result<(), observable_property::PropertyError> {
465/// let property = Arc::new(ObservableProperty::new(0));
466/// let property_clone = property.clone();
467///
468/// // Create subscription in main thread
469/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
470///     println!("Observed: {} -> {}", old, new);
471/// }))?;
472///
473/// // Move subscription to another thread for cleanup
474/// let handle = thread::spawn(move || {
475///     // Subscription is still active here
476///     let _ = property_clone.set(42); // Will trigger observer
477///     
478///     // When subscription is dropped here (end of thread), observer is cleaned up
479///     drop(subscription);
480/// });
481///
482/// handle.join().unwrap();
483/// 
484/// // Observer is no longer active
485/// property.set(100)?; // No output
486/// # Ok(())
487/// # }
488/// ```
489///
490/// ## Conditional Scoped Subscriptions
491///
492/// ```rust
493/// use observable_property::ObservableProperty;
494/// use std::sync::Arc;
495///
496/// # fn main() -> Result<(), observable_property::PropertyError> {
497/// let counter = ObservableProperty::new(0);
498/// let debug_mode = true;
499///
500/// if debug_mode {
501///     let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
502///         println!("Debug: counter {} -> {}", old, new);
503///     }))?;
504///     
505///     counter.set(1)?; // Prints debug info
506///     counter.set(2)?; // Prints debug info
507///     
508///     // Debug subscription automatically cleaned up when exiting if block
509/// }
510///
511/// counter.set(3)?; // No debug output (subscription was cleaned up)
512/// # Ok(())
513/// # }
514/// ```
515///
516/// # Thread Safety
517///
518/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
519/// sent between threads and the automatic cleanup will work correctly even if the
520/// subscription is dropped from a different thread than where it was created.
521pub struct Subscription<T: Clone + Send + Sync + 'static> {
522    inner: Arc<RwLock<InnerProperty<T>>>,
523    id: ObserverId,
524}
525
526impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
527    /// Debug implementation that shows the subscription ID without exposing internals
528    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529        f.debug_struct("Subscription")
530            .field("id", &self.id)
531            .field("inner", &"[ObservableProperty]")
532            .finish()
533    }
534}
535
536impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
537    /// Automatically removes the associated observer when the subscription is dropped
538    ///
539    /// This implementation provides automatic cleanup by removing the observer
540    /// from the property's observer list when the `Subscription` goes out of scope.
541    ///
542    /// # Error Handling
543    ///
544    /// If the property's lock is poisoned or inaccessible during cleanup, the error
545    /// is silently ignored using the `let _ = ...` pattern. This is intentional
546    /// because:
547    /// 1. Drop implementations should not panic
548    /// 2. If the property is poisoned, it's likely unusable anyway
549    /// 3. There's no meaningful way to handle cleanup errors in a destructor
550    ///
551    /// # Thread Safety
552    ///
553    /// This method is safe to call from any thread, even if the subscription
554    /// was created on a different thread.
555    fn drop(&mut self) {
556        // Graceful degradation: always attempt to clean up, even from poisoned locks
557        match self.inner.write() {
558            Ok(mut guard) => {
559                guard.observers.remove(&self.id);
560            }
561            Err(poisoned) => {
562                // Recover from poisoned lock to ensure cleanup happens
563                let mut guard = poisoned.into_inner();
564                guard.observers.remove(&self.id);
565            }
566        }
567    }
568}
569
570/// A thread-safe observable property that notifies observers when its value changes
571///
572/// This type wraps a value of type `T` and allows multiple observers to be notified
573/// whenever the value is modified. All operations are thread-safe and can be called
574/// from multiple threads concurrently.
575///
576/// # Type Requirements
577///
578/// The generic type `T` must implement:
579/// - `Clone`: Required for returning values and passing them to observers
580/// - `Send`: Required for transferring between threads
581/// - `Sync`: Required for concurrent access from multiple threads  
582/// - `'static`: Required for observer callbacks that may outlive the original scope
583///
584/// # Examples
585///
586/// ```rust
587/// use observable_property::ObservableProperty;
588/// use std::sync::Arc;
589///
590/// let property = ObservableProperty::new("initial".to_string());
591///
592/// let observer_id = property.subscribe(Arc::new(|old, new| {
593///     println!("Changed from '{}' to '{}'", old, new);
594/// })).map_err(|e| {
595///     eprintln!("Failed to subscribe: {}", e);
596///     e
597/// })?;
598///
599/// property.set("updated".to_string()).map_err(|e| {
600///     eprintln!("Failed to set value: {}", e);
601///     e
602/// })?; // Prints: Changed from 'initial' to 'updated'
603///
604/// property.unsubscribe(observer_id).map_err(|e| {
605///     eprintln!("Failed to unsubscribe: {}", e);
606///     e
607/// })?;
608/// # Ok::<(), observable_property::PropertyError>(())
609/// ```
610pub struct ObservableProperty<T> {
611    inner: Arc<RwLock<InnerProperty<T>>>,
612    max_threads: usize,
613}
614
615struct InnerProperty<T> {
616    value: T,
617    observers: HashMap<ObserverId, Observer<T>>,
618    next_id: ObserverId,
619}
620
621impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
622    /// Creates a new observable property with the given initial value
623    ///
624    /// # Arguments
625    ///
626    /// * `initial_value` - The starting value for this property
627    ///
628    /// # Examples
629    ///
630    /// ```rust
631    /// use observable_property::ObservableProperty;
632    ///
633    /// let property = ObservableProperty::new(42);
634    /// match property.get() {
635    ///     Ok(value) => assert_eq!(value, 42),
636    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
637    /// }
638    /// ```
639    pub fn new(initial_value: T) -> Self {
640        Self {
641            inner: Arc::new(RwLock::new(InnerProperty {
642                value: initial_value,
643                observers: HashMap::new(),
644                next_id: 0,
645            })),
646            max_threads: MAX_THREADS,
647        }
648    }
649
650    /// Creates a new observable property with a custom maximum thread count for async notifications
651    ///
652    /// This constructor allows you to customize the maximum number of threads used for
653    /// asynchronous observer notifications via `set_async()`. This is useful for tuning
654    /// performance based on your specific use case and system constraints.
655    ///
656    /// # Arguments
657    ///
658    /// * `initial_value` - The starting value for this property
659    /// * `max_threads` - Maximum number of threads to use for async notifications.
660    ///                   If 0 is provided, defaults to 4.
661    ///
662    /// # Thread Pool Behavior
663    ///
664    /// When `set_async()` is called, observers are divided into batches and each batch
665    /// runs in its own thread, up to the specified maximum. For example:
666    /// - With 100 observers and `max_threads = 4`: 4 threads with ~25 observers each
667    /// - With 10 observers and `max_threads = 8`: 10 threads with 1 observer each
668    /// - With 2 observers and `max_threads = 4`: 2 threads with 1 observer each
669    ///
670    /// # Use Cases
671    ///
672    /// ## High-Throughput Systems
673    /// ```rust
674    /// use observable_property::ObservableProperty;
675    ///
676    /// // For systems with many CPU cores and CPU-bound observers
677    /// let property = ObservableProperty::with_max_threads(0, 8);
678    /// ```
679    ///
680    /// ## Resource-Constrained Systems
681    /// ```rust
682    /// use observable_property::ObservableProperty;
683    ///
684    /// // For embedded systems or memory-constrained environments
685    /// let property = ObservableProperty::with_max_threads(42, 1);
686    /// ```
687    ///
688    /// ## I/O-Heavy Observers
689    /// ```rust
690    /// use observable_property::ObservableProperty;
691    ///
692    /// // For observers that do network/database operations
693    /// let property = ObservableProperty::with_max_threads("data".to_string(), 16);
694    /// ```
695    ///
696    /// # Performance Considerations
697    ///
698    /// - **Higher values**: Better parallelism but more thread overhead and memory usage
699    /// - **Lower values**: Less overhead but potentially slower async notifications
700    /// - **Optimal range**: Typically between 1 and 2x the number of CPU cores
701    /// - **Zero value**: Automatically uses the default value (4)
702    ///
703    /// # Thread Safety
704    ///
705    /// This setting only affects async notifications (`set_async()`). Synchronous
706    /// operations (`set()`) always execute observers sequentially regardless of this setting.
707    ///
708    /// # Examples
709    ///
710    /// ```rust
711    /// use observable_property::ObservableProperty;
712    /// use std::sync::Arc;
713    ///
714    /// // Create property with custom thread pool size
715    /// let property = ObservableProperty::with_max_threads(42, 2);
716    ///
717    /// // Subscribe observers as usual
718    /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
719    ///     println!("Value changed: {} -> {}", old, new);
720    /// })).expect("Failed to create subscription");
721    ///
722    /// // Async notifications will use at most 2 threads
723    /// property.set_async(100).expect("Failed to set value asynchronously");
724    /// ```
725    pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
726        let max_threads = if max_threads == 0 {
727            MAX_THREADS
728        } else {
729            max_threads
730        };
731        Self {
732            inner: Arc::new(RwLock::new(InnerProperty {
733                value: initial_value,
734                observers: HashMap::new(),
735                next_id: 0,
736            })),
737            max_threads,
738        }
739    }
740
741    /// Gets the current value of the property
742    ///
743    /// This method acquires a read lock, which allows multiple concurrent readers
744    /// but will block if a writer currently holds the lock.
745    ///
746    /// # Returns
747    ///
748    /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
749    /// if the lock is poisoned.
750    ///
751    /// # Examples
752    ///
753    /// ```rust
754    /// use observable_property::ObservableProperty;
755    ///
756    /// let property = ObservableProperty::new("hello".to_string());
757    /// match property.get() {
758    ///     Ok(value) => assert_eq!(value, "hello"),
759    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
760    /// }
761    /// ```
762    pub fn get(&self) -> Result<T, PropertyError> {
763        match self.inner.read() {
764            Ok(prop) => Ok(prop.value.clone()),
765            Err(poisoned) => {
766                // Graceful degradation: recover value from poisoned lock
767                // This allows continued operation even after a panic in another thread
768                Ok(poisoned.into_inner().value.clone())
769            }
770        }
771    }
772
773    /// Sets the property to a new value and notifies all observers
774    ///
775    /// This method will:
776    /// 1. Acquire a write lock (blocking other readers/writers)
777    /// 2. Update the value and capture a snapshot of observers
778    /// 3. Release the lock
779    /// 4. Notify all observers sequentially with the old and new values
780    ///
781    /// Observer notifications are wrapped in panic recovery to prevent one
782    /// misbehaving observer from affecting others.
783    ///
784    /// # Arguments
785    ///
786    /// * `new_value` - The new value to set
787    ///
788    /// # Returns
789    ///
790    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
791    ///
792    /// # Examples
793    ///
794    /// ```rust
795    /// use observable_property::ObservableProperty;
796    /// use std::sync::Arc;
797    ///
798    /// let property = ObservableProperty::new(10);
799    ///
800    /// property.subscribe(Arc::new(|old, new| {
801    ///     println!("Value changed from {} to {}", old, new);
802    /// })).map_err(|e| {
803    ///     eprintln!("Failed to subscribe: {}", e);
804    ///     e
805    /// })?;
806    ///
807    /// property.set(20).map_err(|e| {
808    ///     eprintln!("Failed to set property value: {}", e);
809    ///     e
810    /// })?; // Triggers observer notification
811    /// # Ok::<(), observable_property::PropertyError>(())
812    /// ```
813    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
814        let (old_value, observers_snapshot) = {
815            let mut prop = match self.inner.write() {
816                Ok(guard) => guard,
817                Err(poisoned) => {
818                    // Graceful degradation: recover from poisoned write lock
819                    // Clear the poison flag by taking ownership of the inner value
820                    poisoned.into_inner()
821                }
822            };
823
824            let old_value = prop.value.clone();
825            prop.value = new_value.clone();
826            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
827            (old_value, observers_snapshot)
828        };
829
830        for observer in observers_snapshot {
831            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
832                observer(&old_value, &new_value);
833            })) {
834                eprintln!("Observer panic: {:?}", e);
835            }
836        }
837
838        Ok(())
839    }
840
841    /// Sets the property to a new value and notifies observers asynchronously
842    ///
843    /// This method is similar to `set()` but spawns observers in background threads
844    /// for non-blocking operation. This is useful when observers might perform
845    /// time-consuming operations.
846    ///
847    /// Observers are batched into groups and each batch runs in its own thread
848    /// to limit resource usage while still providing parallelism.
849    ///
850    /// # Thread Management (Fire-and-Forget Pattern)
851    ///
852    /// **Important**: This method uses a fire-and-forget pattern. Spawned threads are
853    /// **not joined** and run independently in the background. This design is intentional
854    /// for non-blocking behavior but has important implications:
855    ///
856    /// ## Characteristics:
857    /// - ✅ **Non-blocking**: Returns immediately without waiting for observers
858    /// - ✅ **High performance**: No synchronization overhead
859    /// - ⚠️ **No completion guarantee**: Thread may still be running when method returns
860    /// - ⚠️ **No error propagation**: Observer errors are logged but not returned
861    /// - ⚠️ **Testing caveat**: May need explicit delays to observe side effects
862    ///
863    /// ## Use Cases:
864    /// - **UI updates**: Fire updates without blocking the main thread
865    /// - **Logging**: Asynchronous logging that doesn't block operations
866    /// - **Metrics**: Non-critical telemetry that can be lost
867    /// - **Notifications**: Fire-and-forget alerts or messages
868    ///
869    /// ## When NOT to Use:
870    /// - **Critical operations**: Use `set()` if you need guarantees
871    /// - **Transactional updates**: Use `set()` for atomic operations
872    /// - **Sequential dependencies**: If next operation depends on observer completion
873    ///
874    /// ## Testing Considerations:
875    /// ```rust
876    /// use observable_property::ObservableProperty;
877    /// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
878    /// use std::time::Duration;
879    ///
880    /// # fn main() -> Result<(), observable_property::PropertyError> {
881    /// let property = ObservableProperty::new(0);
882    /// let was_called = Arc::new(AtomicBool::new(false));
883    /// let flag = was_called.clone();
884    ///
885    /// property.subscribe(Arc::new(move |_, _| {
886    ///     flag.store(true, Ordering::SeqCst);
887    /// }))?;
888    ///
889    /// property.set_async(42)?;
890    ///
891    /// // ⚠️ Immediate check might fail - thread may not have run yet
892    /// // assert!(was_called.load(Ordering::SeqCst)); // May fail!
893    ///
894    /// // ✅ Add a small delay to allow background thread to complete
895    /// std::thread::sleep(Duration::from_millis(10));
896    /// assert!(was_called.load(Ordering::SeqCst)); // Now reliable
897    /// # Ok(())
898    /// # }
899    /// ```
900    ///
901    /// # Arguments
902    ///
903    /// * `new_value` - The new value to set
904    ///
905    /// # Returns
906    ///
907    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
908    /// Note that this only indicates the property was updated successfully;
909    /// observer execution happens asynchronously and errors are not returned.
910    ///
911    /// # Examples
912    ///
913    /// ## Basic Usage
914    ///
915    /// ```rust
916    /// use observable_property::ObservableProperty;
917    /// use std::sync::Arc;
918    /// use std::time::Duration;
919    ///
920    /// let property = ObservableProperty::new(0);
921    ///
922    /// property.subscribe(Arc::new(|old, new| {
923    ///     // This observer does slow work but won't block the caller
924    ///     std::thread::sleep(Duration::from_millis(100));
925    ///     println!("Slow observer: {} -> {}", old, new);
926    /// })).map_err(|e| {
927    ///     eprintln!("Failed to subscribe: {}", e);
928    ///     e
929    /// })?;
930    ///
931    /// // This returns immediately even though observer is slow
932    /// property.set_async(42).map_err(|e| {
933    ///     eprintln!("Failed to set value asynchronously: {}", e);
934    ///     e
935    /// })?;
936    ///
937    /// // Continue working immediately - observer runs in background
938    /// println!("Main thread continues without waiting");
939    /// # Ok::<(), observable_property::PropertyError>(())
940    /// ```
941    ///
942    /// ## Multiple Rapid Updates
943    ///
944    /// ```rust
945    /// use observable_property::ObservableProperty;
946    /// use std::sync::Arc;
947    ///
948    /// # fn main() -> Result<(), observable_property::PropertyError> {
949    /// let property = ObservableProperty::new(0);
950    ///
951    /// property.subscribe(Arc::new(|old, new| {
952    ///     // Expensive operation (e.g., database update, API call)
953    ///     println!("Processing: {} -> {}", old, new);
954    /// }))?;
955    ///
956    /// // All of these return immediately - observers run in parallel
957    /// property.set_async(1)?;
958    /// property.set_async(2)?;
959    /// property.set_async(3)?;
960    /// property.set_async(4)?;
961    /// property.set_async(5)?;
962    ///
963    /// // All observer calls are now running in background threads
964    /// # Ok(())
965    /// # }
966    /// ```
967    pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
968        let (old_value, observers_snapshot) = {
969            let mut prop = match self.inner.write() {
970                Ok(guard) => guard,
971                Err(poisoned) => {
972                    // Graceful degradation: recover from poisoned write lock
973                    poisoned.into_inner()
974                }
975            };
976
977            let old_value = prop.value.clone();
978            prop.value = new_value.clone();
979            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
980            (old_value, observers_snapshot)
981        };
982
983        if observers_snapshot.is_empty() {
984            return Ok(());
985        }
986
987        let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
988
989        // Fire-and-forget pattern: Spawn threads without joining
990        // This is intentional for non-blocking behavior. Observers run independently
991        // and the caller continues immediately without waiting for completion.
992        // Trade-offs:
993        //   ✅ Non-blocking, high performance
994        //   ⚠️ No completion guarantee, no error propagation to caller
995        for batch in observers_snapshot.chunks(observers_per_thread) {
996            let batch_observers = batch.to_vec();
997            let old_val = old_value.clone();
998            let new_val = new_value.clone();
999
1000            thread::spawn(move || {
1001                for observer in batch_observers {
1002                    if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1003                        observer(&old_val, &new_val);
1004                    })) {
1005                        eprintln!("Observer panic in batch thread: {:?}", e);
1006                    }
1007                }
1008            });
1009            // Thread handle intentionally dropped - fire-and-forget pattern
1010        }
1011
1012        Ok(())
1013    }
1014
1015    /// Subscribes an observer function to be called when the property changes
1016    ///
1017    /// The observer function will be called with the old and new values whenever
1018    /// the property is modified via `set()` or `set_async()`.
1019    ///
1020    /// # Arguments
1021    ///
1022    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1023    ///
1024    /// # Returns
1025    ///
1026    /// `Ok(ObserverId)` containing a unique identifier for this observer,
1027    /// or `Err(PropertyError::InvalidConfiguration)` if the maximum observer limit is exceeded.
1028    ///
1029    /// # Observer Limit
1030    ///
1031    /// To prevent memory exhaustion, there is a maximum limit of observers per property
1032    /// (currently set to 10,000). If you attempt to add more observers than this limit,
1033    /// the subscription will fail with an `InvalidConfiguration` error.
1034    ///
1035    /// This protection helps prevent:
1036    /// - Memory leaks from forgotten unsubscriptions
1037    /// - Unbounded memory growth in long-running applications
1038    /// - Out-of-memory conditions in resource-constrained environments
1039    ///
1040    /// # Examples
1041    ///
1042    /// ```rust
1043    /// use observable_property::ObservableProperty;
1044    /// use std::sync::Arc;
1045    ///
1046    /// let property = ObservableProperty::new(0);
1047    ///
1048    /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
1049    ///     println!("Property changed from {} to {}", old_value, new_value);
1050    /// })).map_err(|e| {
1051    ///     eprintln!("Failed to subscribe observer: {}", e);
1052    ///     e
1053    /// })?;
1054    ///
1055    /// // Later, unsubscribe using the returned ID
1056    /// property.unsubscribe(observer_id).map_err(|e| {
1057    ///     eprintln!("Failed to unsubscribe observer: {}", e);
1058    ///     e
1059    /// })?;
1060    /// # Ok::<(), observable_property::PropertyError>(())
1061    /// ```
1062    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
1063        let mut prop = match self.inner.write() {
1064            Ok(guard) => guard,
1065            Err(poisoned) => {
1066                // Graceful degradation: recover from poisoned write lock
1067                poisoned.into_inner()
1068            }
1069        };
1070
1071        // Check observer limit to prevent memory exhaustion
1072        if prop.observers.len() >= MAX_OBSERVERS {
1073            return Err(PropertyError::InvalidConfiguration {
1074                reason: format!(
1075                    "Maximum observer limit ({}) exceeded. Current observers: {}. \
1076                     Consider unsubscribing unused observers to free resources.",
1077                    MAX_OBSERVERS,
1078                    prop.observers.len()
1079                ),
1080            });
1081        }
1082
1083        let id = prop.next_id;
1084        prop.next_id += 1;
1085        prop.observers.insert(id, observer);
1086        Ok(id)
1087    }
1088
1089    /// Removes an observer identified by its ID
1090    ///
1091    /// # Arguments
1092    ///
1093    /// * `id` - The observer ID returned by `subscribe()`
1094    ///
1095    /// # Returns
1096    ///
1097    /// `Ok(bool)` where `true` means the observer was found and removed,
1098    /// `false` means no observer with that ID existed.
1099    /// Returns `Err(PropertyError)` if the lock is poisoned.
1100    ///
1101    /// # Examples
1102    ///
1103    /// ```rust
1104    /// use observable_property::ObservableProperty;
1105    /// use std::sync::Arc;
1106    ///
1107    /// let property = ObservableProperty::new(0);
1108    /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
1109    ///     eprintln!("Failed to subscribe: {}", e);
1110    ///     e
1111    /// })?;
1112    ///
1113    /// let was_removed = property.unsubscribe(id).map_err(|e| {
1114    ///     eprintln!("Failed to unsubscribe: {}", e);
1115    ///     e
1116    /// })?;
1117    /// assert!(was_removed); // Observer existed and was removed
1118    ///
1119    /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
1120    ///     eprintln!("Failed to unsubscribe again: {}", e);
1121    ///     e
1122    /// })?;
1123    /// assert!(!was_removed_again); // Observer no longer exists
1124    /// # Ok::<(), observable_property::PropertyError>(())
1125    /// ```
1126    pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
1127        let mut prop = match self.inner.write() {
1128            Ok(guard) => guard,
1129            Err(poisoned) => {
1130                // Graceful degradation: recover from poisoned write lock
1131                poisoned.into_inner()
1132            }
1133        };
1134
1135        let was_present = prop.observers.remove(&id).is_some();
1136        Ok(was_present)
1137    }
1138
1139    /// Subscribes an observer that only gets called when a filter condition is met
1140    ///
1141    /// This is useful for observing only specific types of changes, such as
1142    /// when a value increases or crosses a threshold.
1143    ///
1144    /// # Arguments
1145    ///
1146    /// * `observer` - The observer function to call when the filter passes
1147    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1148    ///
1149    /// # Returns
1150    ///
1151    /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
1152    ///
1153    /// # Examples
1154    ///
1155    /// ```rust
1156    /// use observable_property::ObservableProperty;
1157    /// use std::sync::Arc;
1158    ///
1159    /// let property = ObservableProperty::new(0);
1160    ///
1161    /// // Only notify when value increases
1162    /// let id = property.subscribe_filtered(
1163    ///     Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
1164    ///     |old, new| new > old
1165    /// ).map_err(|e| {
1166    ///     eprintln!("Failed to subscribe filtered observer: {}", e);
1167    ///     e
1168    /// })?;
1169    ///
1170    /// property.set(10).map_err(|e| {
1171    ///     eprintln!("Failed to set value: {}", e);
1172    ///     e
1173    /// })?; // Triggers observer (0 -> 10)
1174    /// property.set(5).map_err(|e| {
1175    ///     eprintln!("Failed to set value: {}", e);
1176    ///     e
1177    /// })?;  // Does NOT trigger observer (10 -> 5)
1178    /// property.set(15).map_err(|e| {
1179    ///     eprintln!("Failed to set value: {}", e);
1180    ///     e
1181    /// })?; // Triggers observer (5 -> 15)
1182    /// # Ok::<(), observable_property::PropertyError>(())
1183    /// ```
1184    pub fn subscribe_filtered<F>(
1185        &self,
1186        observer: Observer<T>,
1187        filter: F,
1188    ) -> Result<ObserverId, PropertyError>
1189    where
1190        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1191    {
1192        let filter = Arc::new(filter);
1193        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1194            if filter(old_val, new_val) {
1195                observer(old_val, new_val);
1196            }
1197        });
1198
1199        self.subscribe(filtered_observer)
1200    }
1201
1202    /// Notifies all observers with a batch of changes
1203    ///
1204    /// This method allows you to trigger observer notifications for multiple
1205    /// value changes efficiently. Unlike individual `set()` calls, this method
1206    /// acquires the observer list once and then notifies all observers with each
1207    /// change in the batch.
1208    ///
1209    /// # Performance Characteristics
1210    ///
1211    /// - **Lock optimization**: Acquires read lock only to snapshot observers, then releases it
1212    /// - **Non-blocking**: Other operations can proceed during observer notifications
1213    /// - **Panic isolation**: Individual observer panics don't affect other observers
1214    ///
1215    /// # Arguments
1216    ///
1217    /// * `changes` - A vector of tuples `(old_value, new_value)` to notify observers about
1218    ///
1219    /// # Returns
1220    ///
1221    /// `Ok(())` if successful. Observer errors are logged but don't cause the method to fail.
1222    ///
1223    /// # Examples
1224    ///
1225    /// ```rust
1226    /// use observable_property::ObservableProperty;
1227    /// use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
1228    ///
1229    /// # fn main() -> Result<(), observable_property::PropertyError> {
1230    /// let property = ObservableProperty::new(0);
1231    /// let call_count = Arc::new(AtomicUsize::new(0));
1232    /// let count_clone = call_count.clone();
1233    ///
1234    /// property.subscribe(Arc::new(move |old, new| {
1235    ///     count_clone.fetch_add(1, Ordering::SeqCst);
1236    ///     println!("Change: {} -> {}", old, new);
1237    /// }))?;
1238    ///
1239    /// // Notify with multiple changes at once
1240    /// property.notify_observers_batch(vec![
1241    ///     (0, 10),
1242    ///     (10, 20),
1243    ///     (20, 30),
1244    /// ])?;
1245    ///
1246    /// assert_eq!(call_count.load(Ordering::SeqCst), 3);
1247    /// # Ok(())
1248    /// # }
1249    /// ```
1250    ///
1251    /// # Note
1252    ///
1253    /// This method does NOT update the property's actual value - it only triggers
1254    /// observer notifications. Use `set()` if you want to update the value and
1255    /// notify observers.
1256    pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
1257        // Acquire lock, clone observers, then release lock immediately
1258        // This prevents blocking other operations during potentially long notification process
1259        let observers_snapshot = {
1260            let prop = match self.inner.read() {
1261                Ok(guard) => guard,
1262                Err(poisoned) => {
1263                    // Graceful degradation: recover from poisoned read lock
1264                    poisoned.into_inner()
1265                }
1266            };
1267            prop.observers.values().cloned().collect::<Vec<_>>()
1268        }; // Lock released here
1269
1270        // Notify observers without holding the lock
1271        for (old_val, new_val) in changes {
1272            for observer in &observers_snapshot {
1273                // Wrap in panic recovery like other notification methods
1274                if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1275                    observer(&old_val, &new_val);
1276                })) {
1277                    eprintln!("Observer panic in batch notification: {:?}", e);
1278                }
1279            }
1280        }
1281        Ok(())
1282    }
1283
1284    /// Subscribes an observer and returns a RAII guard for automatic cleanup
1285    ///
1286    /// This method is similar to `subscribe()` but returns a `Subscription` object
1287    /// that automatically removes the observer when it goes out of scope. This
1288    /// provides a more convenient and safer alternative to manual subscription
1289    /// management.
1290    ///
1291    /// # Arguments
1292    ///
1293    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1294    ///
1295    /// # Returns
1296    ///
1297    /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
1298    /// or `Err(PropertyError)` if the lock is poisoned.
1299    ///
1300    /// # Examples
1301    ///
1302    /// ## Basic RAII Subscription
1303    ///
1304    /// ```rust
1305    /// use observable_property::ObservableProperty;
1306    /// use std::sync::Arc;
1307    ///
1308    /// # fn main() -> Result<(), observable_property::PropertyError> {
1309    /// let property = ObservableProperty::new(0);
1310    ///
1311    /// {
1312    ///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1313    ///         println!("Value: {} -> {}", old, new);
1314    ///     }))?;
1315    ///
1316    ///     property.set(42)?; // Prints: "Value: 0 -> 42"
1317    ///     property.set(100)?; // Prints: "Value: 42 -> 100"
1318    ///
1319    ///     // Automatic cleanup when _subscription goes out of scope
1320    /// }
1321    ///
1322    /// property.set(200)?; // No output - subscription was cleaned up
1323    /// # Ok(())
1324    /// # }
1325    /// ```
1326    ///
1327    /// ## Comparison with Manual Management
1328    ///
1329    /// ```rust
1330    /// use observable_property::ObservableProperty;
1331    /// use std::sync::Arc;
1332    ///
1333    /// # fn main() -> Result<(), observable_property::PropertyError> {
1334    /// let property = ObservableProperty::new("initial".to_string());
1335    ///
1336    /// // Method 1: Manual subscription management (traditional approach)
1337    /// let observer_id = property.subscribe(Arc::new(|old, new| {
1338    ///     println!("Manual: {} -> {}", old, new);
1339    /// }))?;
1340    ///
1341    /// // Method 2: RAII subscription management (recommended)
1342    /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1343    ///     println!("RAII: {} -> {}", old, new);
1344    /// }))?;
1345    ///
1346    /// // Both observers will be called
1347    /// property.set("changed".to_string())?;
1348    /// // Prints:
1349    /// // "Manual: initial -> changed"
1350    /// // "RAII: initial -> changed"
1351    ///
1352    /// // Manual cleanup required for first observer
1353    /// property.unsubscribe(observer_id)?;
1354    ///
1355    /// // Second observer (_subscription) is automatically cleaned up when
1356    /// // the variable goes out of scope - no manual intervention needed
1357    /// # Ok(())
1358    /// # }
1359    /// ```
1360    ///
1361    /// ## Error Handling with Early Returns
1362    ///
1363    /// ```rust
1364    /// use observable_property::ObservableProperty;
1365    /// use std::sync::Arc;
1366    ///
1367    /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1368    ///     let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1369    ///         println!("Processing: {} -> {}", old, new);
1370    ///     }))?;
1371    ///
1372    ///     property.set(1)?;
1373    ///     
1374    ///     if property.get()? > 0 {
1375    ///         return Ok(()); // Subscription automatically cleaned up on early return
1376    ///     }
1377    ///
1378    ///     property.set(2)?;
1379    ///     Ok(()) // Subscription automatically cleaned up on normal return
1380    /// }
1381    ///
1382    /// # fn main() -> Result<(), observable_property::PropertyError> {
1383    /// let property = ObservableProperty::new(0);
1384    /// process_with_monitoring(&property)?; // Monitoring active only during function call
1385    /// property.set(99)?; // No monitoring output - subscription was cleaned up
1386    /// # Ok(())
1387    /// # }
1388    /// ```
1389    ///
1390    /// ## Multi-threaded Subscription Management
1391    ///
1392    /// ```rust
1393    /// use observable_property::ObservableProperty;
1394    /// use std::sync::Arc;
1395    /// use std::thread;
1396    ///
1397    /// # fn main() -> Result<(), observable_property::PropertyError> {
1398    /// let property = Arc::new(ObservableProperty::new(0));
1399    /// let property_clone = property.clone();
1400    ///
1401    /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1402    ///     let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1403    ///         println!("Thread observer: {} -> {}", old, new);
1404    ///     }))?;
1405    ///
1406    ///     property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1407    ///     
1408    ///     // Subscription automatically cleaned up when thread ends
1409    ///     Ok(())
1410    /// });
1411    ///
1412    /// handle.join().unwrap()?;
1413    /// property.set(100)?; // No output - thread subscription was cleaned up
1414    /// # Ok(())
1415    /// # }
1416    /// ```
1417    ///
1418    /// # Use Cases
1419    ///
1420    /// This method is particularly useful in scenarios such as:
1421    /// - Temporary observers that should be active only during a specific scope
1422    /// - Error-prone code where manual cleanup might be forgotten
1423    /// - Complex control flow where multiple exit points make manual cleanup difficult
1424    /// - Resource-constrained environments where observer leaks are problematic
1425    pub fn subscribe_with_subscription(
1426        &self,
1427        observer: Observer<T>,
1428    ) -> Result<Subscription<T>, PropertyError> {
1429        let id = self.subscribe(observer)?;
1430        Ok(Subscription {
1431            inner: Arc::clone(&self.inner),
1432            id,
1433        })
1434    }
1435
1436    /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1437    ///
1438    /// This method combines the functionality of `subscribe_filtered()` with the automatic
1439    /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1440    /// called when the filter condition is satisfied, and it will be automatically
1441    /// unsubscribed when the returned `Subscription` goes out of scope.
1442    ///
1443    /// # Arguments
1444    ///
1445    /// * `observer` - The observer function to call when the filter passes
1446    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1447    ///
1448    /// # Returns
1449    ///
1450    /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1451    /// or `Err(PropertyError)` if the lock is poisoned.
1452    ///
1453    /// # Examples
1454    ///
1455    /// ## Basic Filtered RAII Subscription
1456    ///
1457    /// ```rust
1458    /// use observable_property::ObservableProperty;
1459    /// use std::sync::Arc;
1460    ///
1461    /// # fn main() -> Result<(), observable_property::PropertyError> {
1462    /// let counter = ObservableProperty::new(0);
1463    ///
1464    /// {
1465    ///     // Monitor only increases with automatic cleanup
1466    ///     let _increase_monitor = counter.subscribe_filtered_with_subscription(
1467    ///         Arc::new(|old, new| {
1468    ///             println!("Counter increased: {} -> {}", old, new);
1469    ///         }),
1470    ///         |old, new| new > old
1471    ///     )?;
1472    ///
1473    ///     counter.set(5)?;  // Prints: "Counter increased: 0 -> 5"
1474    ///     counter.set(3)?;  // No output (decrease)
1475    ///     counter.set(7)?;  // Prints: "Counter increased: 3 -> 7"
1476    ///
1477    ///     // Subscription automatically cleaned up when leaving scope
1478    /// }
1479    ///
1480    /// counter.set(10)?; // No output - subscription was cleaned up
1481    /// # Ok(())
1482    /// # }
1483    /// ```
1484    ///
1485    /// ## Multi-Condition Temperature Monitoring
1486    ///
1487    /// ```rust
1488    /// use observable_property::ObservableProperty;
1489    /// use std::sync::Arc;
1490    ///
1491    /// # fn main() -> Result<(), observable_property::PropertyError> {
1492    /// let temperature = ObservableProperty::new(20.0_f64);
1493    ///
1494    /// {
1495    ///     // Create filtered subscription that only triggers for significant temperature increases
1496    ///     let _heat_warning = temperature.subscribe_filtered_with_subscription(
1497    ///         Arc::new(|old_temp, new_temp| {
1498    ///             println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1499    ///                      old_temp, new_temp);
1500    ///         }),
1501    ///         |old, new| new > old && (new - old) > 5.0  // Only trigger for increases > 5°C
1502    ///     )?;
1503    ///
1504    ///     // Create another filtered subscription for cooling alerts
1505    ///     let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1506    ///         Arc::new(|old_temp, new_temp| {
1507    ///             println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1508    ///                      old_temp, new_temp);
1509    ///         }),
1510    ///         |old, new| new < old && (old - new) > 3.0  // Only trigger for decreases > 3°C
1511    ///     )?;
1512    ///
1513    ///     // Test the filters
1514    ///     temperature.set(22.0)?; // No alerts (increase of only 2°C)
1515    ///     temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1516    ///     temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1517    ///
1518    ///     // Both subscriptions are automatically cleaned up when they go out of scope
1519    /// }
1520    ///
1521    /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1522    /// # Ok(())
1523    /// # }
1524    /// ```
1525    ///
1526    /// ## Conditional Monitoring with Complex Filters
1527    ///
1528    /// ```rust
1529    /// use observable_property::ObservableProperty;
1530    /// use std::sync::Arc;
1531    ///
1532    /// # fn main() -> Result<(), observable_property::PropertyError> {
1533    /// let stock_price = ObservableProperty::new(100.0_f64);
1534    ///
1535    /// {
1536    ///     // Monitor significant price movements (> 5% change)
1537    ///     let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1538    ///         Arc::new(|old_price, new_price| {
1539    ///             let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1540    ///             println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1541    ///                     old_price, new_price, change_percent);
1542    ///         }),
1543    ///         |old, new| {
1544    ///             let change_percent = ((new - old) / old * 100.0).abs();
1545    ///             change_percent > 5.0  // Trigger on > 5% change
1546    ///         }
1547    ///     )?;
1548    ///
1549    ///     stock_price.set(103.0)?; // No alert (3% change)
1550    ///     stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1551    ///     stock_price.set(95.0)?;  // Alert triggered (12% decrease)
1552    ///
1553    ///     // Subscription automatically cleaned up when leaving scope
1554    /// }
1555    ///
1556    /// stock_price.set(200.0)?; // No alert - monitoring ended
1557    /// # Ok(())
1558    /// # }
1559    /// ```
1560    ///
1561    /// ## Cross-Thread Filtered Monitoring
1562    ///
1563    /// ```rust
1564    /// use observable_property::ObservableProperty;
1565    /// use std::sync::Arc;
1566    /// use std::thread;
1567    /// use std::time::Duration;
1568    ///
1569    /// # fn main() -> Result<(), observable_property::PropertyError> {
1570    /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1571    /// let latency_clone = network_latency.clone();
1572    ///
1573    /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1574    ///     // Monitor high latency in background thread with automatic cleanup
1575    ///     let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1576    ///         Arc::new(|old_ms, new_ms| {
1577    ///             println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1578    ///         }),
1579    ///         |_, new| *new > 100  // Alert when latency exceeds 100ms
1580    ///     )?;
1581    ///
1582    ///     // Simulate monitoring for a short time
1583    ///     thread::sleep(Duration::from_millis(10));
1584    ///     
1585    ///     // Subscription automatically cleaned up when thread ends
1586    ///     Ok(())
1587    /// });
1588    ///
1589    /// // Simulate network conditions
1590    /// network_latency.set(80)?;  // No alert (under threshold)
1591    /// network_latency.set(150)?; // Alert triggered in background thread
1592    ///
1593    /// monitor_handle.join().unwrap()?;
1594    /// network_latency.set(200)?; // No alert - background monitoring ended
1595    /// # Ok(())
1596    /// # }
1597    /// ```
1598    ///
1599    /// # Use Cases
1600    ///
1601    /// This method is ideal for:
1602    /// - Threshold-based monitoring with automatic cleanup
1603    /// - Temporary conditional observers in specific code blocks
1604    /// - Event-driven systems where observers should be active only during certain phases
1605    /// - Resource management scenarios where filtered observers have limited lifetimes
1606    ///
1607    /// # Performance Notes
1608    ///
1609    /// The filter function is evaluated for every property change, so it should be
1610    /// lightweight. Complex filtering logic should be optimized to avoid performance
1611    /// bottlenecks, especially in high-frequency update scenarios.
1612    pub fn subscribe_filtered_with_subscription<F>(
1613        &self,
1614        observer: Observer<T>,
1615        filter: F,
1616    ) -> Result<Subscription<T>, PropertyError>
1617    where
1618        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1619    {
1620        let id = self.subscribe_filtered(observer, filter)?;
1621        Ok(Subscription {
1622            inner: Arc::clone(&self.inner),
1623            id,
1624        })
1625    }
1626}
1627
1628impl<T: Clone> Clone for ObservableProperty<T> {
1629    /// Creates a new reference to the same observable property
1630    ///
1631    /// This creates a new `ObservableProperty` instance that shares the same
1632    /// underlying data with the original. Changes made through either instance
1633    /// will be visible to observers subscribed through both instances.
1634    ///
1635    /// # Examples
1636    ///
1637    /// ```rust
1638    /// use observable_property::ObservableProperty;
1639    /// use std::sync::Arc;
1640    ///
1641    /// let property1 = ObservableProperty::new(42);
1642    /// let property2 = property1.clone();
1643    ///
1644    /// property2.subscribe(Arc::new(|old, new| {
1645    ///     println!("Observer on property2 saw change: {} -> {}", old, new);
1646    /// })).map_err(|e| {
1647    ///     eprintln!("Failed to subscribe: {}", e);
1648    ///     e
1649    /// })?;
1650    ///
1651    /// // This change through property1 will trigger the observer on property2
1652    /// property1.set(100).map_err(|e| {
1653    ///     eprintln!("Failed to set value: {}", e);
1654    ///     e
1655    /// })?;
1656    /// # Ok::<(), observable_property::PropertyError>(())
1657    /// ```
1658    fn clone(&self) -> Self {
1659        Self {
1660            inner: Arc::clone(&self.inner),
1661            max_threads: self.max_threads,
1662        }
1663    }
1664}
1665
1666impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1667    /// Debug implementation that shows the current value if accessible
1668    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1669        match self.get() {
1670            Ok(value) => f
1671                .debug_struct("ObservableProperty")
1672                .field("value", &value)
1673                .field("observers_count", &"[hidden]")
1674                .field("max_threads", &self.max_threads)
1675                .finish(),
1676            Err(_) => f
1677                .debug_struct("ObservableProperty")
1678                .field("value", &"[inaccessible]")
1679                .field("observers_count", &"[hidden]")
1680                .finish(),
1681        }
1682    }
1683}
1684
1685#[cfg(test)]
1686mod tests {
1687    use super::*;
1688    use std::sync::atomic::{AtomicUsize, Ordering};
1689    use std::time::Duration;
1690
1691    #[test]
1692    fn test_property_creation_and_basic_operations() {
1693        let prop = ObservableProperty::new(42);
1694
1695        // Test initial value
1696        match prop.get() {
1697            Ok(value) => assert_eq!(value, 42),
1698            Err(e) => panic!("Failed to get initial value: {}", e),
1699        }
1700
1701        // Test setting value
1702        if let Err(e) = prop.set(100) {
1703            panic!("Failed to set value: {}", e);
1704        }
1705
1706        match prop.get() {
1707            Ok(value) => assert_eq!(value, 100),
1708            Err(e) => panic!("Failed to get updated value: {}", e),
1709        }
1710    }
1711
1712    #[test]
1713    fn test_observer_subscription_and_notification() {
1714        let prop = ObservableProperty::new("initial".to_string());
1715        let notification_count = Arc::new(AtomicUsize::new(0));
1716        let last_old_value = Arc::new(RwLock::new(String::new()));
1717        let last_new_value = Arc::new(RwLock::new(String::new()));
1718
1719        let count_clone = notification_count.clone();
1720        let old_clone = last_old_value.clone();
1721        let new_clone = last_new_value.clone();
1722
1723        let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1724            count_clone.fetch_add(1, Ordering::SeqCst);
1725            if let Ok(mut old_val) = old_clone.write() {
1726                *old_val = old.clone();
1727            }
1728            if let Ok(mut new_val) = new_clone.write() {
1729                *new_val = new.clone();
1730            }
1731        })) {
1732            Ok(id) => id,
1733            Err(e) => panic!("Failed to subscribe observer: {}", e),
1734        };
1735
1736        // Change value and verify notification
1737        if let Err(e) = prop.set("changed".to_string()) {
1738            panic!("Failed to set property value: {}", e);
1739        }
1740
1741        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1742
1743        match last_old_value.read() {
1744            Ok(old_val) => assert_eq!(*old_val, "initial"),
1745            Err(e) => panic!("Failed to read old value: {:?}", e),
1746        }
1747
1748        match last_new_value.read() {
1749            Ok(new_val) => assert_eq!(*new_val, "changed"),
1750            Err(e) => panic!("Failed to read new value: {:?}", e),
1751        }
1752
1753        // Test unsubscription
1754        match prop.unsubscribe(observer_id) {
1755            Ok(was_present) => assert!(was_present),
1756            Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1757        }
1758
1759        // Change value again - should not notify
1760        if let Err(e) = prop.set("not_notified".to_string()) {
1761            panic!("Failed to set property value after unsubscribe: {}", e);
1762        }
1763        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1764    }
1765
1766    #[test]
1767    fn test_filtered_observer() {
1768        let prop = ObservableProperty::new(0i32);
1769        let notification_count = Arc::new(AtomicUsize::new(0));
1770        let count_clone = notification_count.clone();
1771
1772        // Observer only triggered when value increases
1773        let observer_id = match prop.subscribe_filtered(
1774            Arc::new(move |_, _| {
1775                count_clone.fetch_add(1, Ordering::SeqCst);
1776            }),
1777            |old, new| new > old,
1778        ) {
1779            Ok(id) => id,
1780            Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1781        };
1782
1783        // Should trigger (0 -> 5)
1784        if let Err(e) = prop.set(5) {
1785            panic!("Failed to set property value to 5: {}", e);
1786        }
1787        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1788
1789        // Should NOT trigger (5 -> 3)
1790        if let Err(e) = prop.set(3) {
1791            panic!("Failed to set property value to 3: {}", e);
1792        }
1793        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1794
1795        // Should trigger (3 -> 10)
1796        if let Err(e) = prop.set(10) {
1797            panic!("Failed to set property value to 10: {}", e);
1798        }
1799        assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1800
1801        match prop.unsubscribe(observer_id) {
1802            Ok(_) => {}
1803            Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
1804        }
1805    }
1806
1807    #[test]
1808    fn test_thread_safety_concurrent_reads() {
1809        let prop = Arc::new(ObservableProperty::new(42i32));
1810        let num_threads = 10;
1811        let reads_per_thread = 100;
1812
1813        let handles: Vec<_> = (0..num_threads)
1814            .map(|_| {
1815                let prop_clone = prop.clone();
1816                thread::spawn(move || {
1817                    for _ in 0..reads_per_thread {
1818                        match prop_clone.get() {
1819                            Ok(value) => assert_eq!(value, 42),
1820                            Err(e) => panic!("Failed to read property value: {}", e),
1821                        }
1822                        thread::sleep(Duration::from_millis(1));
1823                    }
1824                })
1825            })
1826            .collect();
1827
1828        for handle in handles {
1829            if let Err(e) = handle.join() {
1830                panic!("Thread failed to complete: {:?}", e);
1831            }
1832        }
1833    }
1834
1835    #[test]
1836    fn test_async_set_performance() {
1837        let prop = ObservableProperty::new(0i32);
1838        let slow_observer_count = Arc::new(AtomicUsize::new(0));
1839        let count_clone = slow_observer_count.clone();
1840
1841        // Add observer that simulates slow work
1842        let _id = match prop.subscribe(Arc::new(move |_, _| {
1843            thread::sleep(Duration::from_millis(50));
1844            count_clone.fetch_add(1, Ordering::SeqCst);
1845        })) {
1846            Ok(id) => id,
1847            Err(e) => panic!("Failed to subscribe slow observer: {}", e),
1848        };
1849
1850        // Test synchronous set (should be slow)
1851        let start = std::time::Instant::now();
1852        if let Err(e) = prop.set(1) {
1853            panic!("Failed to set property value synchronously: {}", e);
1854        }
1855        let sync_duration = start.elapsed();
1856
1857        // Test asynchronous set (should be fast)
1858        let start = std::time::Instant::now();
1859        if let Err(e) = prop.set_async(2) {
1860            panic!("Failed to set property value asynchronously: {}", e);
1861        }
1862        let async_duration = start.elapsed();
1863
1864        // Async should be much faster than sync
1865        assert!(async_duration < sync_duration);
1866        assert!(async_duration.as_millis() < 10); // Should be very fast
1867
1868        // Wait for async observer to complete
1869        thread::sleep(Duration::from_millis(100));
1870
1871        // Both observers should have been called
1872        assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
1873    }
1874
1875    #[test]
1876    fn test_lock_poisoning() {
1877        // Create a property that we'll poison
1878        let prop = Arc::new(ObservableProperty::new(0));
1879        let prop_clone = prop.clone();
1880
1881        // Create a thread that will deliberately poison the lock
1882        let poison_thread = thread::spawn(move || {
1883            // Get write lock and then panic, which will poison the lock
1884            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1885            panic!("Deliberate panic to poison the lock");
1886        });
1887
1888        // Wait for the thread to complete (it will panic)
1889        let _ = poison_thread.join();
1890
1891        // With graceful degradation, operations should succeed even with poisoned locks
1892        // The implementation recovers the inner value using into_inner()
1893        
1894        // get() should succeed by recovering from poisoned lock
1895        match prop.get() {
1896            Ok(value) => assert_eq!(value, 0), // Should recover the value
1897            Err(e) => panic!("get() should succeed with graceful degradation, got error: {:?}", e),
1898        }
1899
1900        // set() should succeed by recovering from poisoned lock
1901        match prop.set(42) {
1902            Ok(_) => {}, // Expected success with graceful degradation
1903            Err(e) => panic!("set() should succeed with graceful degradation, got error: {:?}", e),
1904        }
1905        
1906        // Verify the value was actually set
1907        assert_eq!(prop.get().expect("Failed to get value after set"), 42);
1908
1909        // subscribe() should succeed by recovering from poisoned lock
1910        match prop.subscribe(Arc::new(|_, _| {})) {
1911            Ok(_) => {}, // Expected success with graceful degradation
1912            Err(e) => panic!("subscribe() should succeed with graceful degradation, got error: {:?}", e),
1913        }
1914    }
1915
1916    #[test]
1917    fn test_observer_panic_isolation() {
1918        let prop = ObservableProperty::new(0);
1919        let call_counts = Arc::new(AtomicUsize::new(0));
1920
1921        // First observer will panic
1922        let panic_observer_id = prop
1923            .subscribe(Arc::new(|_, _| {
1924                panic!("This observer deliberately panics");
1925            }))
1926            .expect("Failed to subscribe panic observer");
1927
1928        // Second observer should still be called despite first one panicking
1929        let counts = call_counts.clone();
1930        let normal_observer_id = prop
1931            .subscribe(Arc::new(move |_, _| {
1932                counts.fetch_add(1, Ordering::SeqCst);
1933            }))
1934            .expect("Failed to subscribe normal observer");
1935
1936        // Trigger the observers - this shouldn't panic despite the first observer panicking
1937        prop.set(42).expect("Failed to set property value");
1938
1939        // Verify the second observer was still called
1940        assert_eq!(call_counts.load(Ordering::SeqCst), 1);
1941
1942        // Clean up
1943        prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
1944        prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
1945    }
1946
1947    #[test]
1948    fn test_unsubscribe_nonexistent_observer() {
1949        let property = ObservableProperty::new(0);
1950
1951        // Generate a valid observer ID
1952        let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
1953
1954        // Create an ID that doesn't exist (valid_id + 1000 should not exist)
1955        let nonexistent_id = valid_id + 1000;
1956
1957        // Test unsubscribing a nonexistent observer
1958        match property.unsubscribe(nonexistent_id) {
1959            Ok(was_present) => {
1960                assert!(
1961                    !was_present,
1962                    "Unsubscribe should return false for nonexistent ID"
1963                );
1964            }
1965            Err(e) => panic!("Unsubscribe returned error: {:?}", e),
1966        }
1967
1968        // Also verify that unsubscribing twice returns false the second time
1969        property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
1970
1971        let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
1972        assert!(!result, "Second unsubscribe should return false");
1973    }
1974
1975    #[test]
1976    fn test_observer_id_wraparound() {
1977        let prop = ObservableProperty::new(0);
1978
1979        // Test that observer IDs increment properly and don't wrap around unexpectedly
1980        let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
1981        let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
1982        let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
1983
1984        assert!(id2 > id1, "Observer IDs should increment");
1985        assert!(id3 > id2, "Observer IDs should continue incrementing");
1986        assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
1987        assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
1988
1989        // Clean up
1990        prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
1991        prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
1992        prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
1993    }
1994
1995    #[test]
1996    fn test_concurrent_subscribe_unsubscribe() {
1997        let prop = Arc::new(ObservableProperty::new(0));
1998        let num_threads = 8;
1999        let operations_per_thread = 100;
2000
2001        let handles: Vec<_> = (0..num_threads)
2002            .map(|thread_id| {
2003                let prop_clone = prop.clone();
2004                thread::spawn(move || {
2005                    let mut local_ids = Vec::new();
2006
2007                    for i in 0..operations_per_thread {
2008                        // Subscribe an observer
2009                        let observer_id = prop_clone
2010                            .subscribe(Arc::new(move |old, new| {
2011                                // Do some work to simulate real observer
2012                                let _ = thread_id + i + old + new;
2013                            }))
2014                            .expect("Subscribe should succeed");
2015
2016                        local_ids.push(observer_id);
2017
2018                        // Occasionally unsubscribe some observers
2019                        if i % 10 == 0 && !local_ids.is_empty() {
2020                            let idx = i % local_ids.len();
2021                            let id_to_remove = local_ids.remove(idx);
2022                            prop_clone
2023                                .unsubscribe(id_to_remove)
2024                                .expect("Unsubscribe should succeed");
2025                        }
2026                    }
2027
2028                    // Clean up remaining observers
2029                    for id in local_ids {
2030                        prop_clone
2031                            .unsubscribe(id)
2032                            .expect("Final cleanup should succeed");
2033                    }
2034                })
2035            })
2036            .collect();
2037
2038        // Wait for all threads to complete
2039        for handle in handles {
2040            handle.join().expect("Thread should complete successfully");
2041        }
2042
2043        // Property should still be functional
2044        prop.set(42)
2045            .expect("Property should still work after concurrent operations");
2046    }
2047
2048    #[test]
2049    fn test_multiple_observer_panics_isolation() {
2050        let prop = ObservableProperty::new(0);
2051        let successful_calls = Arc::new(AtomicUsize::new(0));
2052
2053        // Create multiple observers that will panic
2054        let _panic_id1 = prop
2055            .subscribe(Arc::new(|_, _| {
2056                panic!("First panic observer");
2057            }))
2058            .expect("Failed to subscribe first panic observer");
2059
2060        let _panic_id2 = prop
2061            .subscribe(Arc::new(|_, _| {
2062                panic!("Second panic observer");
2063            }))
2064            .expect("Failed to subscribe second panic observer");
2065
2066        // Create observers that should succeed despite the panics
2067        let count1 = successful_calls.clone();
2068        let _success_id1 = prop
2069            .subscribe(Arc::new(move |_, _| {
2070                count1.fetch_add(1, Ordering::SeqCst);
2071            }))
2072            .expect("Failed to subscribe first success observer");
2073
2074        let count2 = successful_calls.clone();
2075        let _success_id2 = prop
2076            .subscribe(Arc::new(move |_, _| {
2077                count2.fetch_add(1, Ordering::SeqCst);
2078            }))
2079            .expect("Failed to subscribe second success observer");
2080
2081        // Trigger all observers
2082        prop.set(42).expect("Failed to set property value for panic isolation test");
2083
2084        // Both successful observers should have been called despite the panics
2085        assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
2086    }
2087
2088    #[test]
2089    fn test_async_observer_panic_isolation() {
2090        let prop = ObservableProperty::new(0);
2091        let successful_calls = Arc::new(AtomicUsize::new(0));
2092
2093        // Create observer that will panic
2094        let _panic_id = prop
2095            .subscribe(Arc::new(|_, _| {
2096                panic!("Async panic observer");
2097            }))
2098            .expect("Failed to subscribe async panic observer");
2099
2100        // Create observer that should succeed
2101        let count = successful_calls.clone();
2102        let _success_id = prop
2103            .subscribe(Arc::new(move |_, _| {
2104                count.fetch_add(1, Ordering::SeqCst);
2105            }))
2106            .expect("Failed to subscribe async success observer");
2107
2108        // Use async set to trigger observers in background threads
2109        prop.set_async(42).expect("Failed to set property value asynchronously");
2110
2111        // Wait for async observers to complete
2112        thread::sleep(Duration::from_millis(100));
2113
2114        // The successful observer should have been called despite the panic
2115        assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
2116    }
2117
2118    #[test]
2119    fn test_very_large_observer_count() {
2120        let prop = ObservableProperty::new(0);
2121        let total_calls = Arc::new(AtomicUsize::new(0));
2122        let observer_count = 1000;
2123
2124        // Subscribe many observers
2125        let mut observer_ids = Vec::with_capacity(observer_count);
2126        for i in 0..observer_count {
2127            let count = total_calls.clone();
2128            let id = prop
2129                .subscribe(Arc::new(move |old, new| {
2130                    count.fetch_add(1, Ordering::SeqCst);
2131                    // Verify we got the right values
2132                    assert_eq!(*old, 0);
2133                    assert_eq!(*new, i + 1);
2134                }))
2135                .expect("Failed to subscribe large observer count test observer");
2136            observer_ids.push(id);
2137        }
2138
2139        // Trigger all observers
2140        prop.set(observer_count).expect("Failed to set property value for large observer count test");
2141
2142        // All observers should have been called
2143        assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
2144
2145        // Clean up
2146        for id in observer_ids {
2147            prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
2148        }
2149    }
2150
2151    #[test]
2152    fn test_observer_with_mutable_state() {
2153        let prop = ObservableProperty::new(0);
2154        let call_history = Arc::new(RwLock::new(Vec::new()));
2155
2156        let history = call_history.clone();
2157        let observer_id = prop
2158            .subscribe(Arc::new(move |old, new| {
2159                if let Ok(mut hist) = history.write() {
2160                    hist.push((*old, *new));
2161                }
2162            }))
2163            .expect("Failed to subscribe mutable state observer");
2164
2165        // Make several changes
2166        prop.set(1).expect("Failed to set property to 1");
2167        prop.set(2).expect("Failed to set property to 2");
2168        prop.set(3).expect("Failed to set property to 3");
2169
2170        // Verify the history was recorded correctly
2171        let history = call_history.read().expect("Failed to read call history");
2172        assert_eq!(history.len(), 3);
2173        assert_eq!(history[0], (0, 1));
2174        assert_eq!(history[1], (1, 2));
2175        assert_eq!(history[2], (2, 3));
2176
2177        prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
2178    }
2179
2180    #[test]
2181    fn test_subscription_automatic_cleanup() {
2182        let prop = ObservableProperty::new(0);
2183        let call_count = Arc::new(AtomicUsize::new(0));
2184
2185        // Test that subscription automatically cleans up when dropped
2186        {
2187            let count = call_count.clone();
2188            let _subscription = prop
2189                .subscribe_with_subscription(Arc::new(move |_, _| {
2190                    count.fetch_add(1, Ordering::SeqCst);
2191                }))
2192                .expect("Failed to create subscription for automatic cleanup test");
2193
2194            // Observer should be active while subscription is in scope
2195            prop.set(1).expect("Failed to set property value in subscription test");
2196            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2197
2198            // Subscription goes out of scope here and should auto-cleanup
2199        }
2200
2201        // Observer should no longer be active after subscription dropped
2202        prop.set(2).expect("Failed to set property value after subscription dropped");
2203        assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
2204    }
2205
2206    #[test]
2207    fn test_subscription_explicit_drop() {
2208        let prop = ObservableProperty::new(0);
2209        let call_count = Arc::new(AtomicUsize::new(0));
2210
2211        let count = call_count.clone();
2212        let subscription = prop
2213            .subscribe_with_subscription(Arc::new(move |_, _| {
2214                count.fetch_add(1, Ordering::SeqCst);
2215            }))
2216            .expect("Failed to create subscription for explicit drop test");
2217
2218        // Observer should be active
2219        prop.set(1).expect("Failed to set property value before explicit drop");
2220        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2221
2222        // Explicitly drop the subscription
2223        drop(subscription);
2224
2225        // Observer should no longer be active
2226        prop.set(2).expect("Failed to set property value after explicit drop");
2227        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2228    }
2229
2230    #[test]
2231    fn test_multiple_subscriptions_with_cleanup() {
2232        let prop = ObservableProperty::new(0);
2233        let call_count1 = Arc::new(AtomicUsize::new(0));
2234        let call_count2 = Arc::new(AtomicUsize::new(0));
2235        let call_count3 = Arc::new(AtomicUsize::new(0));
2236
2237        let count1 = call_count1.clone();
2238        let count2 = call_count2.clone();
2239        let count3 = call_count3.clone();
2240
2241        let subscription1 = prop
2242            .subscribe_with_subscription(Arc::new(move |_, _| {
2243                count1.fetch_add(1, Ordering::SeqCst);
2244            }))
2245            .expect("Failed to create first subscription");
2246
2247        let subscription2 = prop
2248            .subscribe_with_subscription(Arc::new(move |_, _| {
2249                count2.fetch_add(1, Ordering::SeqCst);
2250            }))
2251            .expect("Failed to create second subscription");
2252
2253        let subscription3 = prop
2254            .subscribe_with_subscription(Arc::new(move |_, _| {
2255                count3.fetch_add(1, Ordering::SeqCst);
2256            }))
2257            .expect("Failed to create third subscription");
2258
2259        // All observers should be active
2260        prop.set(1).expect("Failed to set property value with all subscriptions");
2261        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
2262        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2263        assert_eq!(call_count3.load(Ordering::SeqCst), 1);
2264
2265        // Drop second subscription
2266        drop(subscription2);
2267
2268        // Only first and third should be active
2269        prop.set(2).expect("Failed to set property value with partial subscriptions");
2270        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2271        assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
2272        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2273
2274        // Drop remaining subscriptions
2275        drop(subscription1);
2276        drop(subscription3);
2277
2278        // No observers should be active
2279        prop.set(3).expect("Failed to set property value with no subscriptions");
2280        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2281        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2282        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2283    }
2284
2285    #[test]
2286    fn test_subscription_drop_with_poisoned_lock() {
2287        let prop = Arc::new(ObservableProperty::new(0));
2288        let prop_clone = prop.clone();
2289
2290        // Create a subscription
2291        let call_count = Arc::new(AtomicUsize::new(0));
2292        let count = call_count.clone();
2293        let subscription = prop
2294            .subscribe_with_subscription(Arc::new(move |_, _| {
2295                count.fetch_add(1, Ordering::SeqCst);
2296            }))
2297            .expect("Failed to create subscription for poisoned lock test");
2298
2299        // Poison the lock by panicking while holding a write lock
2300        let poison_thread = thread::spawn(move || {
2301            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2302            panic!("Deliberate panic to poison the lock");
2303        });
2304        let _ = poison_thread.join(); // Ignore the panic result
2305
2306        // Dropping the subscription should not panic even with poisoned lock
2307        // This tests that the Drop implementation handles poisoned locks gracefully
2308        drop(subscription); // Should complete without panic
2309
2310        // Test passes if we reach here without panicking
2311    }
2312
2313    #[test]
2314    fn test_subscription_vs_manual_unsubscribe() {
2315        let prop = ObservableProperty::new(0);
2316        let auto_count = Arc::new(AtomicUsize::new(0));
2317        let manual_count = Arc::new(AtomicUsize::new(0));
2318
2319        // Manual subscription
2320        let manual_count_clone = manual_count.clone();
2321        let manual_id = prop
2322            .subscribe(Arc::new(move |_, _| {
2323                manual_count_clone.fetch_add(1, Ordering::SeqCst);
2324            }))
2325            .expect("Failed to create manual subscription");
2326
2327        // Automatic subscription
2328        let auto_count_clone = auto_count.clone();
2329        let _auto_subscription = prop
2330            .subscribe_with_subscription(Arc::new(move |_, _| {
2331                auto_count_clone.fetch_add(1, Ordering::SeqCst);
2332            }))
2333            .expect("Failed to create automatic subscription");
2334
2335        // Both should be active
2336        prop.set(1).expect("Failed to set property value with both subscriptions");
2337        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2338        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2339
2340        // Manual unsubscribe
2341        prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2342
2343        // Only automatic subscription should be active
2344        prop.set(2).expect("Failed to set property value after manual unsubscribe");
2345        assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2346        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2347
2348        // Auto subscription goes out of scope here and cleans up automatically
2349    }
2350
2351    #[test]
2352    fn test_subscribe_with_subscription_error_handling() {
2353        let prop = Arc::new(ObservableProperty::new(0));
2354        let prop_clone = prop.clone();
2355
2356        // Poison the lock
2357        let poison_thread = thread::spawn(move || {
2358            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2359            panic!("Deliberate panic to poison the lock");
2360        });
2361        let _ = poison_thread.join();
2362
2363        // With graceful degradation, subscribe_with_subscription should succeed
2364        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2365        assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
2366    }
2367
2368    #[test]
2369    fn test_subscription_with_property_cloning() {
2370        let prop1 = ObservableProperty::new(0);
2371        let prop2 = prop1.clone();
2372        let call_count = Arc::new(AtomicUsize::new(0));
2373
2374        // Subscribe to prop1
2375        let count = call_count.clone();
2376        let _subscription = prop1
2377            .subscribe_with_subscription(Arc::new(move |_, _| {
2378                count.fetch_add(1, Ordering::SeqCst);
2379            }))
2380            .expect("Failed to create subscription for cloned property test");
2381
2382        // Changes through prop2 should trigger the observer subscribed to prop1
2383        prop2.set(42).expect("Failed to set property value through prop2");
2384        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2385
2386        // Changes through prop1 should also trigger the observer
2387        prop1.set(100).expect("Failed to set property value through prop1");
2388        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2389    }
2390
2391    #[test]
2392    fn test_subscription_in_conditional_blocks() {
2393        let prop = ObservableProperty::new(0);
2394        let call_count = Arc::new(AtomicUsize::new(0));
2395
2396        let should_subscribe = true;
2397
2398        if should_subscribe {
2399            let count = call_count.clone();
2400            let _subscription = prop
2401                .subscribe_with_subscription(Arc::new(move |_, _| {
2402                    count.fetch_add(1, Ordering::SeqCst);
2403                }))
2404                .expect("Failed to create subscription in conditional block");
2405
2406            // Observer active within this block
2407            prop.set(1).expect("Failed to set property value in conditional block");
2408            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2409
2410            // Subscription dropped when exiting this block
2411        }
2412
2413        // Observer should be inactive now
2414        prop.set(2).expect("Failed to set property value after conditional block");
2415        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2416    }
2417
2418    #[test]
2419    fn test_subscription_with_early_return() {
2420        fn test_function(
2421            prop: &ObservableProperty<i32>,
2422            should_return_early: bool,
2423        ) -> Result<(), PropertyError> {
2424            let call_count = Arc::new(AtomicUsize::new(0));
2425            let count = call_count.clone();
2426
2427            let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2428                count.fetch_add(1, Ordering::SeqCst);
2429            }))?;
2430
2431            prop.set(1)?;
2432            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2433
2434            if should_return_early {
2435                return Ok(()); // Subscription should be cleaned up here
2436            }
2437
2438            prop.set(2)?;
2439            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2440
2441            Ok(())
2442            // Subscription cleaned up when function exits normally
2443        }
2444
2445        let prop = ObservableProperty::new(0);
2446
2447        // Test early return
2448        test_function(&prop, true).expect("Failed to test early return");
2449
2450        // Verify observer is no longer active after early return
2451        prop.set(10).expect("Failed to set property value after early return");
2452
2453        // Test normal exit
2454        test_function(&prop, false).expect("Failed to test normal exit");
2455
2456        // Verify observer is no longer active after normal exit
2457        prop.set(20).expect("Failed to set property value after normal exit");
2458    }
2459
2460    #[test]
2461    fn test_subscription_move_semantics() {
2462        let prop = ObservableProperty::new(0);
2463        let call_count = Arc::new(AtomicUsize::new(0));
2464
2465        let count = call_count.clone();
2466        let subscription = prop
2467            .subscribe_with_subscription(Arc::new(move |_, _| {
2468                count.fetch_add(1, Ordering::SeqCst);
2469            }))
2470            .expect("Failed to create subscription for move semantics test");
2471
2472        // Observer should be active
2473        prop.set(1).expect("Failed to set property value before move");
2474        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2475
2476        // Move subscription to a new variable
2477        let moved_subscription = subscription;
2478
2479        // Observer should still be active after move
2480        prop.set(2).expect("Failed to set property value after move");
2481        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2482
2483        // Drop the moved subscription
2484        drop(moved_subscription);
2485
2486        // Observer should now be inactive
2487        prop.set(3).expect("Failed to set property value after moved subscription drop");
2488        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2489    }
2490
2491    #[test]
2492    fn test_filtered_subscription_automatic_cleanup() {
2493        let prop = ObservableProperty::new(0);
2494        let call_count = Arc::new(AtomicUsize::new(0));
2495
2496        {
2497            let count = call_count.clone();
2498            let _subscription = prop
2499                .subscribe_filtered_with_subscription(
2500                    Arc::new(move |_, _| {
2501                        count.fetch_add(1, Ordering::SeqCst);
2502                    }),
2503                    |old, new| new > old, // Only trigger on increases
2504                )
2505                .expect("Failed to create filtered subscription");
2506
2507            // Should trigger (0 -> 5)
2508            prop.set(5).expect("Failed to set property value to 5 in filtered test");
2509            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2510
2511            // Should NOT trigger (5 -> 3)
2512            prop.set(3).expect("Failed to set property value to 3 in filtered test");
2513            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2514
2515            // Should trigger (3 -> 10)
2516            prop.set(10).expect("Failed to set property value to 10 in filtered test");
2517            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2518
2519            // Subscription goes out of scope here
2520        }
2521
2522        // Observer should be inactive after subscription cleanup
2523        prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2524        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2525    }
2526
2527    #[test]
2528    fn test_multiple_filtered_subscriptions() {
2529        let prop = ObservableProperty::new(10);
2530        let increase_count = Arc::new(AtomicUsize::new(0));
2531        let decrease_count = Arc::new(AtomicUsize::new(0));
2532        let significant_change_count = Arc::new(AtomicUsize::new(0));
2533
2534        let inc_count = increase_count.clone();
2535        let dec_count = decrease_count.clone();
2536        let sig_count = significant_change_count.clone();
2537
2538        let _increase_sub = prop
2539            .subscribe_filtered_with_subscription(
2540                Arc::new(move |_, _| {
2541                    inc_count.fetch_add(1, Ordering::SeqCst);
2542                }),
2543                |old, new| new > old,
2544            )
2545            .expect("Failed to create increase subscription");
2546
2547        let _decrease_sub = prop
2548            .subscribe_filtered_with_subscription(
2549                Arc::new(move |_, _| {
2550                    dec_count.fetch_add(1, Ordering::SeqCst);
2551                }),
2552                |old, new| new < old,
2553            )
2554            .expect("Failed to create decrease subscription");
2555
2556        let _significant_sub = prop
2557            .subscribe_filtered_with_subscription(
2558                Arc::new(move |_, _| {
2559                    sig_count.fetch_add(1, Ordering::SeqCst);
2560                }),
2561                |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2562            )
2563            .expect("Failed to create significant change subscription");
2564
2565        // Test increases
2566        prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2567        assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2568        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2569        assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2570
2571        // Test significant increase
2572        prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2573        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2574        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2575        assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2576
2577        // Test significant decrease
2578        prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2579        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2580        assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2581        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2582
2583        // Test small decrease
2584        prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2585        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2586        assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2587        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2588
2589        // All subscriptions auto-cleanup when they go out of scope
2590    }
2591
2592    #[test]
2593    fn test_filtered_subscription_complex_filter() {
2594        let prop = ObservableProperty::new(0.0f64);
2595        let call_count = Arc::new(AtomicUsize::new(0));
2596        let values_received = Arc::new(RwLock::new(Vec::new()));
2597
2598        let count = call_count.clone();
2599        let values = values_received.clone();
2600        let _subscription = prop
2601            .subscribe_filtered_with_subscription(
2602                Arc::new(move |old, new| {
2603                    count.fetch_add(1, Ordering::SeqCst);
2604                    if let Ok(mut v) = values.write() {
2605                        v.push((*old, *new));
2606                    }
2607                }),
2608                |old, new| {
2609                    // Complex filter: trigger only when crossing integer boundaries
2610                    // and the change is significant (> 0.5)
2611                    let old_int = old.floor() as i32;
2612                    let new_int = new.floor() as i32;
2613                    old_int != new_int && (new - old).abs() > 0.5_f64
2614                },
2615            )
2616            .expect("Failed to create complex filtered subscription");
2617
2618        // Small changes within same integer - should not trigger
2619        prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2620        prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2621        assert_eq!(call_count.load(Ordering::SeqCst), 0);
2622
2623        // Cross integer boundary with significant change - should trigger
2624        prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2625        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2626
2627        // Small cross-boundary change - should not trigger
2628        prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2629        prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2630        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2631
2632        // Large cross-boundary change - should trigger
2633        prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2634        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2635
2636        // Verify received values
2637        let values = values_received.read().expect("Failed to read values in complex filter test");
2638        assert_eq!(values.len(), 2);
2639        assert_eq!(values[0], (0.7, 1.3));
2640        assert_eq!(values[1], (2.1, 3.5));
2641    }
2642
2643    #[test]
2644    fn test_filtered_subscription_error_handling() {
2645        let prop = Arc::new(ObservableProperty::new(0));
2646        let prop_clone = prop.clone();
2647
2648        // Poison the lock
2649        let poison_thread = thread::spawn(move || {
2650            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2651            panic!("Deliberate panic to poison the lock");
2652        });
2653        let _ = poison_thread.join();
2654
2655        // With graceful degradation, subscribe_filtered_with_subscription should succeed
2656        let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2657        assert!(result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
2658    }
2659
2660    #[test]
2661    fn test_filtered_subscription_vs_manual_filtered() {
2662        let prop = ObservableProperty::new(0);
2663        let auto_count = Arc::new(AtomicUsize::new(0));
2664        let manual_count = Arc::new(AtomicUsize::new(0));
2665
2666        // Manual filtered subscription
2667        let manual_count_clone = manual_count.clone();
2668        let manual_id = prop
2669            .subscribe_filtered(
2670                Arc::new(move |_, _| {
2671                    manual_count_clone.fetch_add(1, Ordering::SeqCst);
2672                }),
2673                |old, new| new > old,
2674            )
2675            .expect("Failed to create manual filtered subscription");
2676
2677        // Automatic filtered subscription
2678        let auto_count_clone = auto_count.clone();
2679        let _auto_subscription = prop
2680            .subscribe_filtered_with_subscription(
2681                Arc::new(move |_, _| {
2682                    auto_count_clone.fetch_add(1, Ordering::SeqCst);
2683                }),
2684                |old, new| new > old,
2685            )
2686            .expect("Failed to create automatic filtered subscription");
2687
2688        // Both should be triggered by increases
2689        prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2690        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2691        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2692
2693        // Neither should be triggered by decreases
2694        prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2695        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2696        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2697
2698        // Both should be triggered by increases again
2699        prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2700        assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2701        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2702
2703        // Manual cleanup
2704        prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2705
2706        // Only automatic subscription should be active
2707        prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2708        assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2709        assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2710
2711        // Auto subscription cleaned up when it goes out of scope
2712    }
2713
2714    #[test]
2715    fn test_filtered_subscription_with_panicking_filter() {
2716        let prop = ObservableProperty::new(0);
2717        let call_count = Arc::new(AtomicUsize::new(0));
2718
2719        let count = call_count.clone();
2720        let _subscription = prop
2721            .subscribe_filtered_with_subscription(
2722                Arc::new(move |_, _| {
2723                    count.fetch_add(1, Ordering::SeqCst);
2724                }),
2725                |_, new| {
2726                    if *new == 42 {
2727                        panic!("Filter panic on 42");
2728                    }
2729                    true // Accept all other values
2730                },
2731            )
2732            .expect("Failed to create panicking filter subscription");
2733
2734        // Normal value should work
2735        prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2736        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2737
2738        // Value that causes filter to panic should be handled gracefully
2739        // The behavior here depends on how the filter panic is handled
2740        // In the current implementation, filter panics may cause the observer to not be called
2741        prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2742
2743        // Observer should still work for subsequent normal values
2744        prop.set(2).expect("Failed to set property to 2 after filter panic");
2745        // Note: The exact count here depends on panic handling implementation
2746        // The important thing is that the system doesn't crash
2747    }
2748
2749    #[test]
2750    fn test_subscription_thread_safety() {
2751        let prop = Arc::new(ObservableProperty::new(0));
2752        let num_threads = 8;
2753        let operations_per_thread = 50;
2754        let total_calls = Arc::new(AtomicUsize::new(0));
2755
2756        let handles: Vec<_> = (0..num_threads)
2757            .map(|thread_id| {
2758                let prop_clone = prop.clone();
2759                let calls_clone = total_calls.clone();
2760
2761                thread::spawn(move || {
2762                    let mut local_subscriptions = Vec::new();
2763
2764                    for i in 0..operations_per_thread {
2765                        let calls = calls_clone.clone();
2766                        let subscription = prop_clone
2767                            .subscribe_with_subscription(Arc::new(move |old, new| {
2768                                calls.fetch_add(1, Ordering::SeqCst);
2769                                // Simulate some work
2770                                let _ = thread_id + i + old + new;
2771                            }))
2772                            .expect("Should be able to create subscription");
2773
2774                        local_subscriptions.push(subscription);
2775
2776                        // Trigger observers
2777                        prop_clone
2778                            .set(thread_id * 1000 + i)
2779                            .expect("Should be able to set value");
2780
2781                        // Occasionally drop some subscriptions
2782                        if i % 5 == 0 && !local_subscriptions.is_empty() {
2783                            local_subscriptions.remove(0); // Drop first subscription
2784                        }
2785                    }
2786
2787                    // All remaining subscriptions dropped when vector goes out of scope
2788                })
2789            })
2790            .collect();
2791
2792        // Wait for all threads to complete
2793        for handle in handles {
2794            handle.join().expect("Thread should complete successfully");
2795        }
2796
2797        // Property should still be functional after all the concurrent operations
2798        prop.set(9999).expect("Property should still work");
2799
2800        // We can't easily verify the exact call count due to the complex timing,
2801        // but we can verify that the system didn't crash and is still operational
2802        println!(
2803            "Total observer calls: {}",
2804            total_calls.load(Ordering::SeqCst)
2805        );
2806    }
2807
2808    #[test]
2809    fn test_subscription_cross_thread_drop() {
2810        let prop = Arc::new(ObservableProperty::new(0));
2811        let call_count = Arc::new(AtomicUsize::new(0));
2812
2813        // Create subscription in main thread
2814        let count = call_count.clone();
2815        let subscription = prop
2816            .subscribe_with_subscription(Arc::new(move |_, _| {
2817                count.fetch_add(1, Ordering::SeqCst);
2818            }))
2819            .expect("Failed to create subscription for cross-thread drop test");
2820
2821        // Verify observer is active
2822        prop.set(1).expect("Failed to set property value in cross-thread drop test");
2823        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2824
2825        // Move subscription to another thread and drop it there
2826        let prop_clone = prop.clone();
2827        let call_count_clone = call_count.clone();
2828
2829        let handle = thread::spawn(move || {
2830            // Verify observer is still active in the other thread
2831            prop_clone.set(2).expect("Failed to set property value in other thread");
2832            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
2833
2834            // Drop subscription in this thread
2835            drop(subscription);
2836
2837            // Verify observer is no longer active
2838            prop_clone.set(3).expect("Failed to set property value after drop in other thread");
2839            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
2840        });
2841
2842        handle.join().expect("Failed to join cross-thread drop test thread");
2843
2844        // Verify observer is still inactive in main thread
2845        prop.set(4).expect("Failed to set property value after thread join");
2846        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2847    }
2848
2849    #[test]
2850    fn test_concurrent_subscription_creation_and_property_changes() {
2851        let prop = Arc::new(ObservableProperty::new(0));
2852        let total_notifications = Arc::new(AtomicUsize::new(0));
2853        let num_subscriber_threads = 4;
2854        let num_setter_threads = 2;
2855        let operations_per_thread = 25;
2856
2857        // Threads that create and destroy subscriptions
2858        let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
2859            .map(|_| {
2860                let prop_clone = prop.clone();
2861                let notifications_clone = total_notifications.clone();
2862
2863                thread::spawn(move || {
2864                    for _ in 0..operations_per_thread {
2865                        let notifications = notifications_clone.clone();
2866                        let _subscription = prop_clone
2867                            .subscribe_with_subscription(Arc::new(move |_, _| {
2868                                notifications.fetch_add(1, Ordering::SeqCst);
2869                            }))
2870                            .expect("Should create subscription");
2871
2872                        // Keep subscription alive for a short time
2873                        thread::sleep(Duration::from_millis(1));
2874
2875                        // Subscription dropped when _subscription goes out of scope
2876                    }
2877                })
2878            })
2879            .collect();
2880
2881        // Threads that continuously change the property value
2882        let setter_handles: Vec<_> = (0..num_setter_threads)
2883            .map(|thread_id| {
2884                let prop_clone = prop.clone();
2885
2886                thread::spawn(move || {
2887                    for i in 0..operations_per_thread * 2 {
2888                        prop_clone
2889                            .set(thread_id * 10000 + i)
2890                            .expect("Should set value");
2891                        thread::sleep(Duration::from_millis(1));
2892                    }
2893                })
2894            })
2895            .collect();
2896
2897        // Wait for all threads to complete
2898        for handle in subscriber_handles
2899            .into_iter()
2900            .chain(setter_handles.into_iter())
2901        {
2902            handle.join().expect("Thread should complete");
2903        }
2904
2905        // System should be stable after concurrent operations
2906        prop.set(99999).expect("Property should still work");
2907
2908        println!(
2909            "Total notifications during concurrent test: {}",
2910            total_notifications.load(Ordering::SeqCst)
2911        );
2912    }
2913
2914    #[test]
2915    fn test_filtered_subscription_thread_safety() {
2916        let prop = Arc::new(ObservableProperty::new(0));
2917        let increase_notifications = Arc::new(AtomicUsize::new(0));
2918        let decrease_notifications = Arc::new(AtomicUsize::new(0));
2919        let num_threads = 6;
2920
2921        let handles: Vec<_> = (0..num_threads)
2922            .map(|thread_id| {
2923                let prop_clone = prop.clone();
2924                let inc_notifications = increase_notifications.clone();
2925                let dec_notifications = decrease_notifications.clone();
2926
2927                thread::spawn(move || {
2928                    // Create increase-only subscription
2929                    let inc_count = inc_notifications.clone();
2930                    let _inc_subscription = prop_clone
2931                        .subscribe_filtered_with_subscription(
2932                            Arc::new(move |_, _| {
2933                                inc_count.fetch_add(1, Ordering::SeqCst);
2934                            }),
2935                            |old, new| new > old,
2936                        )
2937                        .expect("Should create filtered subscription");
2938
2939                    // Create decrease-only subscription
2940                    let dec_count = dec_notifications.clone();
2941                    let _dec_subscription = prop_clone
2942                        .subscribe_filtered_with_subscription(
2943                            Arc::new(move |_, _| {
2944                                dec_count.fetch_add(1, Ordering::SeqCst);
2945                            }),
2946                            |old, new| new < old,
2947                        )
2948                        .expect("Should create filtered subscription");
2949
2950                    // Perform some property changes
2951                    let base_value = thread_id * 100;
2952                    for i in 0..20 {
2953                        let new_value = base_value + (i % 10); // Creates increases and decreases
2954                        prop_clone.set(new_value).expect("Should set value");
2955                        thread::sleep(Duration::from_millis(1));
2956                    }
2957
2958                    // Subscriptions automatically cleaned up when going out of scope
2959                })
2960            })
2961            .collect();
2962
2963        // Wait for all threads
2964        for handle in handles {
2965            handle.join().expect("Thread should complete");
2966        }
2967
2968        // Verify system is still operational
2969        let initial_inc = increase_notifications.load(Ordering::SeqCst);
2970        let initial_dec = decrease_notifications.load(Ordering::SeqCst);
2971
2972        prop.set(1000).expect("Property should still work");
2973        prop.set(2000).expect("Property should still work");
2974
2975        // No new notifications should occur (all subscriptions cleaned up)
2976        assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
2977        assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
2978
2979        println!(
2980            "Increase notifications: {}, Decrease notifications: {}",
2981            initial_inc, initial_dec
2982        );
2983    }
2984
2985    #[test]
2986    fn test_subscription_with_async_property_changes() {
2987        let prop = Arc::new(ObservableProperty::new(0));
2988        let sync_notifications = Arc::new(AtomicUsize::new(0));
2989        let async_notifications = Arc::new(AtomicUsize::new(0));
2990
2991        // Subscription that tracks sync notifications
2992        let sync_count = sync_notifications.clone();
2993        let _sync_subscription = prop
2994            .subscribe_with_subscription(Arc::new(move |old, new| {
2995                sync_count.fetch_add(1, Ordering::SeqCst);
2996                // Simulate slow observer work
2997                thread::sleep(Duration::from_millis(5));
2998                println!("Sync observer: {} -> {}", old, new);
2999            }))
3000            .expect("Failed to create sync subscription");
3001
3002        // Subscription that tracks async notifications
3003        let async_count = async_notifications.clone();
3004        let _async_subscription = prop
3005            .subscribe_with_subscription(Arc::new(move |old, new| {
3006                async_count.fetch_add(1, Ordering::SeqCst);
3007                println!("Async observer: {} -> {}", old, new);
3008            }))
3009            .expect("Failed to create async subscription");
3010
3011        // Test sync property changes
3012        let start = std::time::Instant::now();
3013        prop.set(1).expect("Failed to set property value 1 in async test");
3014        prop.set(2).expect("Failed to set property value 2 in async test");
3015        let sync_duration = start.elapsed();
3016
3017        // Test async property changes
3018        let start = std::time::Instant::now();
3019        prop.set_async(3).expect("Failed to set property value 3 async");
3020        prop.set_async(4).expect("Failed to set property value 4 async");
3021        let async_duration = start.elapsed();
3022
3023        // Async should be much faster
3024        assert!(async_duration < sync_duration);
3025
3026        // Wait for async observers to complete
3027        thread::sleep(Duration::from_millis(50));
3028
3029        // All observers should have been called
3030        assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
3031        assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
3032
3033        // Subscriptions auto-cleanup when going out of scope
3034    }
3035
3036    #[test]
3037    fn test_subscription_creation_with_poisoned_lock() {
3038        let prop = Arc::new(ObservableProperty::new(0));
3039        let prop_clone = prop.clone();
3040
3041        // Create a valid subscription before poisoning
3042        let call_count = Arc::new(AtomicUsize::new(0));
3043        let count = call_count.clone();
3044        let existing_subscription = prop
3045            .subscribe_with_subscription(Arc::new(move |_, _| {
3046                count.fetch_add(1, Ordering::SeqCst);
3047            }))
3048            .expect("Failed to create subscription before poisoning");
3049
3050        // Poison the lock
3051        let poison_thread = thread::spawn(move || {
3052            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
3053            panic!("Deliberate panic to poison the lock");
3054        });
3055        let _ = poison_thread.join();
3056
3057        // With graceful degradation, new subscription creation should succeed
3058        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
3059        assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3060
3061        // New filtered subscription creation should also succeed
3062        let filtered_result =
3063            prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
3064        assert!(filtered_result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
3065
3066        // Dropping existing subscription should not panic
3067        drop(existing_subscription);
3068    }
3069
3070    #[test]
3071    fn test_subscription_cleanup_behavior_with_poisoned_lock() {
3072        // This test specifically verifies that Drop doesn't panic with poisoned locks
3073        let prop = Arc::new(ObservableProperty::new(0));
3074        let call_count = Arc::new(AtomicUsize::new(0));
3075
3076        // Create subscription
3077        let count = call_count.clone();
3078        let subscription = prop
3079            .subscribe_with_subscription(Arc::new(move |_, _| {
3080                count.fetch_add(1, Ordering::SeqCst);
3081            }))
3082            .expect("Failed to create subscription for cleanup behavior test");
3083
3084        // Verify it works initially
3085        prop.set(1).expect("Failed to set property value in cleanup behavior test");
3086        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3087
3088        // Poison the lock from another thread
3089        let prop_clone = prop.clone();
3090        let poison_thread = thread::spawn(move || {
3091            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
3092            panic!("Deliberate panic to poison the lock");
3093        });
3094        let _ = poison_thread.join();
3095
3096        // Now drop the subscription - this should NOT panic
3097        // The Drop implementation should handle the poisoned lock gracefully
3098        drop(subscription);
3099
3100        // Test succeeds if we reach this point without panicking
3101    }
3102
3103    #[test]
3104    fn test_multiple_subscription_cleanup_with_poisoned_lock() {
3105        let prop = Arc::new(ObservableProperty::new(0));
3106        let mut subscriptions = Vec::new();
3107
3108        // Create multiple subscriptions
3109        for i in 0..5 {
3110            let call_count = Arc::new(AtomicUsize::new(0));
3111            let count = call_count.clone();
3112            let subscription = prop
3113                .subscribe_with_subscription(Arc::new(move |old, new| {
3114                    count.fetch_add(1, Ordering::SeqCst);
3115                    println!("Observer {}: {} -> {}", i, old, new);
3116                }))
3117                .expect("Failed to create subscription in multiple cleanup test");
3118            subscriptions.push(subscription);
3119        }
3120
3121        // Verify they all work
3122        prop.set(42).expect("Failed to set property value in multiple cleanup test");
3123
3124        // Poison the lock
3125        let prop_clone = prop.clone();
3126        let poison_thread = thread::spawn(move || {
3127            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
3128            panic!("Deliberate panic to poison the lock");
3129        });
3130        let _ = poison_thread.join();
3131
3132        // Drop all subscriptions - none should panic
3133        for subscription in subscriptions {
3134            drop(subscription);
3135        }
3136
3137        // Test succeeds if no panics occurred
3138    }
3139
3140    #[test]
3141    fn test_subscription_behavior_before_and_after_poison() {
3142        let prop = Arc::new(ObservableProperty::new(0));
3143        let before_poison_count = Arc::new(AtomicUsize::new(0));
3144        let after_poison_count = Arc::new(AtomicUsize::new(0));
3145
3146        // Create subscription before poisoning
3147        let before_count = before_poison_count.clone();
3148        let before_subscription = prop
3149            .subscribe_with_subscription(Arc::new(move |_, _| {
3150                before_count.fetch_add(1, Ordering::SeqCst);
3151            }))
3152            .expect("Failed to create subscription before poison test");
3153
3154        // Verify it works
3155        prop.set(1).expect("Failed to set property value before poison test");
3156        assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
3157
3158        // Poison the lock
3159        let prop_clone = prop.clone();
3160        let poison_thread = thread::spawn(move || {
3161            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
3162            panic!("Deliberate panic to poison the lock");
3163        });
3164        let _ = poison_thread.join();
3165
3166        // With graceful degradation, subscription creation after poisoning should succeed
3167        let after_count = after_poison_count.clone();
3168        let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
3169            after_count.fetch_add(1, Ordering::SeqCst);
3170        }));
3171        assert!(after_result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3172        
3173        let _after_subscription = after_result.unwrap();
3174
3175        // Clean up the before-poison subscription - should not panic
3176        drop(before_subscription);
3177    }
3178
3179    #[test]
3180    fn test_concurrent_subscription_drops_with_poison() {
3181        let prop = Arc::new(ObservableProperty::new(0));
3182        let num_subscriptions = 10;
3183        let mut subscriptions = Vec::new();
3184
3185        // Create multiple subscriptions
3186        for i in 0..num_subscriptions {
3187            let call_count = Arc::new(AtomicUsize::new(0));
3188            let count = call_count.clone();
3189            let subscription = prop
3190                .subscribe_with_subscription(Arc::new(move |_, _| {
3191                    count.fetch_add(1, Ordering::SeqCst);
3192                    println!("Observer {}", i);
3193                }))
3194                .expect("Failed to create subscription in concurrent drops test");
3195            subscriptions.push(subscription);
3196        }
3197
3198        // Poison the lock
3199        let prop_clone = prop.clone();
3200        let poison_thread = thread::spawn(move || {
3201            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
3202            panic!("Deliberate panic to poison the lock");
3203        });
3204        let _ = poison_thread.join();
3205
3206        // Drop subscriptions concurrently from multiple threads
3207        let handles: Vec<_> = subscriptions
3208            .into_iter()
3209            .enumerate()
3210            .map(|(i, subscription)| {
3211                thread::spawn(move || {
3212                    // Add some randomness to timing
3213                    thread::sleep(Duration::from_millis(i as u64 % 5));
3214                    drop(subscription);
3215                    println!("Dropped subscription {}", i);
3216                })
3217            })
3218            .collect();
3219
3220        // Wait for all drops to complete
3221        for handle in handles {
3222            handle
3223                .join()
3224                .expect("Drop thread should complete without panic");
3225        }
3226
3227        // Test succeeds if all threads completed successfully
3228    }
3229
3230    
3231    #[test]
3232    fn test_with_max_threads_creation() {
3233        // Test creation with various thread counts
3234        let prop1 = ObservableProperty::with_max_threads(42, 1);
3235        let prop2 = ObservableProperty::with_max_threads("test".to_string(), 8);
3236        let prop3 = ObservableProperty::with_max_threads(0.5_f64, 16);
3237
3238        // Verify initial values are correct
3239        assert_eq!(prop1.get().expect("Failed to get prop1 value"), 42);
3240        assert_eq!(prop2.get().expect("Failed to get prop2 value"), "test");
3241        assert_eq!(prop3.get().expect("Failed to get prop3 value"), 0.5);
3242    }
3243
3244    #[test]
3245    fn test_with_max_threads_zero_defaults_to_max_threads() {
3246        // Test that zero max_threads defaults to MAX_THREADS (4)
3247        let prop1 = ObservableProperty::with_max_threads(100, 0);
3248        let prop2 = ObservableProperty::new(100); // Uses default MAX_THREADS
3249
3250        // Both should have the same max_threads value
3251        // We can't directly access max_threads, but we can verify behavior is consistent
3252        assert_eq!(prop1.get().expect("Failed to get prop1 value"), 100);
3253        assert_eq!(prop2.get().expect("Failed to get prop2 value"), 100);
3254    }
3255
3256    #[test]
3257    fn test_with_max_threads_basic_functionality() {
3258        let prop = ObservableProperty::with_max_threads(0, 2);
3259        let call_count = Arc::new(AtomicUsize::new(0));
3260
3261        // Subscribe an observer
3262        let count = call_count.clone();
3263        let _subscription = prop
3264            .subscribe_with_subscription(Arc::new(move |old, new| {
3265                count.fetch_add(1, Ordering::SeqCst);
3266                assert_eq!(*old, 0);
3267                assert_eq!(*new, 42);
3268            }))
3269            .expect("Failed to create subscription for max_threads test");
3270
3271        // Test synchronous set
3272        prop.set(42).expect("Failed to set value synchronously");
3273        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3274
3275        // Test asynchronous set
3276        prop.set_async(43).expect("Failed to set value asynchronously");
3277        
3278        // Wait for async observers to complete
3279        thread::sleep(Duration::from_millis(50));
3280        assert_eq!(call_count.load(Ordering::SeqCst), 2);
3281    }
3282
3283    #[test]
3284    fn test_with_max_threads_async_performance() {
3285        // Test that with_max_threads affects async performance
3286        let prop = ObservableProperty::with_max_threads(0, 1); // Single thread
3287        let slow_call_count = Arc::new(AtomicUsize::new(0));
3288
3289        // Add multiple slow observers
3290        let mut subscriptions = Vec::new();
3291        for _ in 0..4 {
3292            let count = slow_call_count.clone();
3293            let subscription = prop
3294                .subscribe_with_subscription(Arc::new(move |_, _| {
3295                    thread::sleep(Duration::from_millis(25)); // Simulate slow work
3296                    count.fetch_add(1, Ordering::SeqCst);
3297                }))
3298                .expect("Failed to create slow observer subscription");
3299            subscriptions.push(subscription);
3300        }
3301
3302        // Measure time for async notification
3303        let start = std::time::Instant::now();
3304        prop.set_async(42).expect("Failed to set value asynchronously");
3305        let async_duration = start.elapsed();
3306
3307        // Should return quickly even with slow observers
3308        assert!(async_duration.as_millis() < 50, "Async set should return quickly");
3309
3310        // Wait for all observers to complete
3311        thread::sleep(Duration::from_millis(200));
3312        assert_eq!(slow_call_count.load(Ordering::SeqCst), 4);
3313    }
3314
3315    #[test]
3316    fn test_with_max_threads_vs_regular_constructor() {
3317        let prop_regular = ObservableProperty::new(42);
3318        let prop_custom = ObservableProperty::with_max_threads(42, 4); // Same as default
3319
3320        let count_regular = Arc::new(AtomicUsize::new(0));
3321        let count_custom = Arc::new(AtomicUsize::new(0));
3322
3323        // Both should behave identically
3324        let count1 = count_regular.clone();
3325        let _sub1 = prop_regular
3326            .subscribe_with_subscription(Arc::new(move |_, _| {
3327                count1.fetch_add(1, Ordering::SeqCst);
3328            }))
3329            .expect("Failed to create regular subscription");
3330
3331        let count2 = count_custom.clone();
3332        let _sub2 = prop_custom
3333            .subscribe_with_subscription(Arc::new(move |_, _| {
3334                count2.fetch_add(1, Ordering::SeqCst);
3335            }))
3336            .expect("Failed to create custom subscription");
3337
3338        // Test sync behavior
3339        prop_regular.set(100).expect("Failed to set regular property");
3340        prop_custom.set(100).expect("Failed to set custom property");
3341
3342        assert_eq!(count_regular.load(Ordering::SeqCst), 1);
3343        assert_eq!(count_custom.load(Ordering::SeqCst), 1);
3344
3345        // Test async behavior
3346        prop_regular.set_async(200).expect("Failed to set regular property async");
3347        prop_custom.set_async(200).expect("Failed to set custom property async");
3348
3349        thread::sleep(Duration::from_millis(50));
3350        assert_eq!(count_regular.load(Ordering::SeqCst), 2);
3351        assert_eq!(count_custom.load(Ordering::SeqCst), 2);
3352    }
3353
3354    #[test]
3355    fn test_with_max_threads_large_values() {
3356        // Test with very large max_threads values
3357        let prop = ObservableProperty::with_max_threads(0, 1000);
3358        let call_count = Arc::new(AtomicUsize::new(0));
3359
3360        // Add a few observers
3361        let count = call_count.clone();
3362        let _subscription = prop
3363            .subscribe_with_subscription(Arc::new(move |_, _| {
3364                count.fetch_add(1, Ordering::SeqCst);
3365            }))
3366            .expect("Failed to create subscription for large max_threads test");
3367
3368        // Should work normally even with excessive thread limit
3369        prop.set_async(42).expect("Failed to set value with large max_threads");
3370
3371        thread::sleep(Duration::from_millis(50));
3372        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3373    }
3374
3375    #[test]
3376    fn test_with_max_threads_clone_behavior() {
3377        let prop1 = ObservableProperty::with_max_threads(42, 2);
3378        let prop2 = prop1.clone();
3379
3380        let call_count1 = Arc::new(AtomicUsize::new(0));
3381        let call_count2 = Arc::new(AtomicUsize::new(0));
3382
3383        // Subscribe to both properties
3384        let count1 = call_count1.clone();
3385        let _sub1 = prop1
3386            .subscribe_with_subscription(Arc::new(move |_, _| {
3387                count1.fetch_add(1, Ordering::SeqCst);
3388            }))
3389            .expect("Failed to create subscription for cloned property test");
3390
3391        let count2 = call_count2.clone();
3392        let _sub2 = prop2
3393            .subscribe_with_subscription(Arc::new(move |_, _| {
3394                count2.fetch_add(1, Ordering::SeqCst);
3395            }))
3396            .expect("Failed to create subscription for original property test");
3397
3398        // Changes through either property should trigger both observers
3399        prop1.set_async(100).expect("Failed to set value through prop1");
3400        thread::sleep(Duration::from_millis(50));
3401        
3402        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
3403        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
3404
3405        prop2.set_async(200).expect("Failed to set value through prop2");
3406        thread::sleep(Duration::from_millis(50));
3407        
3408        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
3409        assert_eq!(call_count2.load(Ordering::SeqCst), 2);
3410    }
3411
3412    #[test]
3413    fn test_with_max_threads_thread_safety() {
3414        let prop = Arc::new(ObservableProperty::with_max_threads(0, 3));
3415        let call_count = Arc::new(AtomicUsize::new(0));
3416
3417        // Add observers from multiple threads
3418        let handles: Vec<_> = (0..5)
3419            .map(|thread_id| {
3420                let prop_clone = prop.clone();
3421                let count_clone = call_count.clone();
3422
3423                thread::spawn(move || {
3424                    let count = count_clone.clone();
3425                    let _subscription = prop_clone
3426                        .subscribe_with_subscription(Arc::new(move |_, _| {
3427                            count.fetch_add(1, Ordering::SeqCst);
3428                        }))
3429                        .expect("Failed to create thread-safe subscription");
3430
3431                    // Trigger async notifications from this thread
3432                    prop_clone
3433                        .set_async(thread_id * 10)
3434                        .expect("Failed to set value from thread");
3435                        
3436                    thread::sleep(Duration::from_millis(10));
3437                })
3438            })
3439            .collect();
3440
3441        // Wait for all threads to complete
3442        for handle in handles {
3443            handle.join().expect("Thread should complete successfully");
3444        }
3445
3446        // Wait for all async operations to complete
3447        thread::sleep(Duration::from_millis(100));
3448
3449        // Each set_async should trigger all active observers at that time
3450        // The exact count depends on timing, but should be > 0
3451        assert!(call_count.load(Ordering::SeqCst) > 0);
3452    }
3453
3454    #[test]
3455    fn test_with_max_threads_error_handling() {
3456        let prop = ObservableProperty::with_max_threads(42, 2);
3457        
3458        // Test that error handling works the same as regular properties
3459        let _subscription = prop
3460            .subscribe_with_subscription(Arc::new(|_, _| {
3461                // Normal observer
3462            }))
3463            .expect("Failed to create subscription for error handling test");
3464
3465        // Should handle errors gracefully
3466        assert!(prop.set(100).is_ok());
3467        assert!(prop.set_async(200).is_ok());
3468        assert_eq!(prop.get().expect("Failed to get value after error test"), 200);
3469    }
3470
3471    #[test]
3472    fn test_observer_limit_enforcement() {
3473        let prop = ObservableProperty::new(0);
3474        let mut observer_ids = Vec::new();
3475
3476        // Add observers up to the limit (using a small test to avoid slow tests)
3477        // In reality, MAX_OBSERVERS is 10,000, but we'll test the mechanism
3478        // by adding a reasonable number and then checking the error message
3479        for i in 0..100 {
3480            let result = prop.subscribe(Arc::new(move |_, _| {
3481                let _ = i; // Use the capture to make each observer unique
3482            }));
3483            assert!(result.is_ok(), "Should be able to add observer {}", i);
3484            observer_ids.push(result.unwrap());
3485        }
3486
3487        // Verify all observers were added
3488        assert_eq!(observer_ids.len(), 100);
3489
3490        // Verify we can still add more (we're well under the 10,000 limit)
3491        let result = prop.subscribe(Arc::new(|_, _| {}));
3492        assert!(result.is_ok());
3493    }
3494
3495    #[test]
3496    fn test_observer_limit_error_message() {
3497        let prop = ObservableProperty::new(0);
3498        
3499        // We can't easily test hitting the actual 10,000 limit in a unit test
3500        // (it would be too slow), but we can verify the error type exists
3501        // and the subscribe method has the check
3502        
3503        // Add a few observers successfully
3504        for _ in 0..10 {
3505            assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3506        }
3507
3508        // The mechanism is in place - the limit check happens before insertion
3509        // In production, if 10,000 observers are added, the 10,001st will fail
3510    }
3511
3512    #[test]
3513    fn test_observer_limit_with_unsubscribe() {
3514        let prop = ObservableProperty::new(0);
3515        
3516        // Add observers
3517        let mut ids = Vec::new();
3518        for _ in 0..50 {
3519            ids.push(prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe"));
3520        }
3521
3522        // Remove half of them
3523        for id in ids.iter().take(25) {
3524            assert!(prop.unsubscribe(*id).expect("Failed to unsubscribe"));
3525        }
3526
3527        // Should be able to add more observers after unsubscribing
3528        for _ in 0..30 {
3529            assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3530        }
3531    }
3532
3533    #[test]
3534    fn test_observer_limit_with_raii_subscriptions() {
3535        let prop = ObservableProperty::new(0);
3536        
3537        // Create RAII subscriptions
3538        let mut subscriptions = Vec::new();
3539        for _ in 0..50 {
3540            subscriptions.push(
3541                prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3542                    .expect("Failed to create subscription")
3543            );
3544        }
3545
3546        // Drop half of them (automatic cleanup)
3547        subscriptions.truncate(25);
3548
3549        // Should be able to add more after RAII cleanup
3550        for _ in 0..30 {
3551            let _sub = prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3552                .expect("Failed to create subscription after RAII cleanup");
3553        }
3554    }
3555
3556    #[test]
3557    fn test_filtered_subscription_respects_observer_limit() {
3558        let prop = ObservableProperty::new(0);
3559        
3560        // Add regular and filtered observers
3561        for i in 0..50 {
3562            if i % 2 == 0 {
3563                assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3564            } else {
3565                assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3566            }
3567        }
3568
3569        // Both types count toward the limit
3570        // Should still be well under the 10,000 limit
3571        assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3572    }
3573
3574    #[test]
3575    fn test_observer_limit_concurrent_subscriptions() {
3576        let prop = Arc::new(ObservableProperty::new(0));
3577        let success_count = Arc::new(AtomicUsize::new(0));
3578
3579        // Try to add observers from multiple threads
3580        let handles: Vec<_> = (0..10)
3581            .map(|_| {
3582                let prop_clone = prop.clone();
3583                let count_clone = success_count.clone();
3584                
3585                thread::spawn(move || {
3586                    for _ in 0..10 {
3587                        if prop_clone.subscribe(Arc::new(|_, _| {})).is_ok() {
3588                            count_clone.fetch_add(1, Ordering::SeqCst);
3589                        }
3590                        thread::sleep(Duration::from_micros(10));
3591                    }
3592                })
3593            })
3594            .collect();
3595
3596        for handle in handles {
3597            handle.join().expect("Thread should complete");
3598        }
3599
3600        // All 100 subscriptions should succeed (well under limit)
3601        assert_eq!(success_count.load(Ordering::SeqCst), 100);
3602    }
3603
3604    #[test]
3605    fn test_notify_observers_batch_releases_lock_early() {
3606        use std::sync::atomic::AtomicBool;
3607        
3608        let prop = Arc::new(ObservableProperty::new(0));
3609        let call_count = Arc::new(AtomicUsize::new(0));
3610        let started = Arc::new(AtomicBool::new(false));
3611        
3612        // Subscribe with a slow observer
3613        let started_clone = started.clone();
3614        let count_clone = call_count.clone();
3615        prop.subscribe(Arc::new(move |_, _| {
3616            started_clone.store(true, Ordering::SeqCst);
3617            count_clone.fetch_add(1, Ordering::SeqCst);
3618            // Simulate slow observer
3619            thread::sleep(Duration::from_millis(50));
3620        })).expect("Failed to subscribe");
3621        
3622        // Start batch notification in background
3623        let prop_clone = prop.clone();
3624        let batch_handle = thread::spawn(move || {
3625            prop_clone.notify_observers_batch(vec![(0, 1), (1, 2)]).expect("Failed to notify batch");
3626        });
3627        
3628        // Wait for observer to start
3629        while !started.load(Ordering::SeqCst) {
3630            thread::sleep(Duration::from_millis(1));
3631        }
3632        
3633        // Now verify we can still subscribe while observer is running
3634        // This proves the lock was released before observer execution
3635        let subscribe_result = prop.subscribe(Arc::new(|_, _| {
3636            // New observer
3637        }));
3638        
3639        assert!(subscribe_result.is_ok(), "Should be able to subscribe while batch notification is in progress");
3640        
3641        // Wait for batch to complete
3642        batch_handle.join().expect("Batch thread should complete");
3643        
3644        // Verify observers were called (2 changes in batch)
3645        assert_eq!(call_count.load(Ordering::SeqCst), 2);
3646    }
3647
3648    #[test]
3649    fn test_notify_observers_batch_panic_isolation() {
3650        let prop = ObservableProperty::new(0);
3651        let good_observer_count = Arc::new(AtomicUsize::new(0));
3652        let count_clone = good_observer_count.clone();
3653        
3654        // First observer that panics
3655        prop.subscribe(Arc::new(|_, _| {
3656            panic!("Deliberate panic in batch observer");
3657        })).expect("Failed to subscribe panicking observer");
3658        
3659        // Second observer that should still be called
3660        prop.subscribe(Arc::new(move |_, _| {
3661            count_clone.fetch_add(1, Ordering::SeqCst);
3662        })).expect("Failed to subscribe good observer");
3663        
3664        // Batch notification should not fail despite panic
3665        let result = prop.notify_observers_batch(vec![(0, 1), (1, 2), (2, 3)]);
3666        assert!(result.is_ok());
3667        
3668        // Second observer should have been called for all 3 changes
3669        assert_eq!(good_observer_count.load(Ordering::SeqCst), 3);
3670    }
3671
3672    #[test]
3673    fn test_notify_observers_batch_multiple_changes() {
3674        let prop = ObservableProperty::new(0);
3675        let received_changes = Arc::new(RwLock::new(Vec::new()));
3676        let changes_clone = received_changes.clone();
3677        
3678        prop.subscribe(Arc::new(move |old, new| {
3679            if let Ok(mut changes) = changes_clone.write() {
3680                changes.push((*old, *new));
3681            }
3682        })).expect("Failed to subscribe");
3683        
3684        // Send multiple changes
3685        prop.notify_observers_batch(vec![
3686            (0, 10),
3687            (10, 20),
3688            (20, 30),
3689            (30, 40),
3690        ]).expect("Failed to notify batch");
3691        
3692        let changes = received_changes.read().expect("Failed to read changes");
3693        assert_eq!(changes.len(), 4);
3694        assert_eq!(changes[0], (0, 10));
3695        assert_eq!(changes[1], (10, 20));
3696        assert_eq!(changes[2], (20, 30));
3697        assert_eq!(changes[3], (30, 40));
3698    }
3699
3700    #[test]
3701    fn test_notify_observers_batch_empty() {
3702        let prop = ObservableProperty::new(0);
3703        let call_count = Arc::new(AtomicUsize::new(0));
3704        let count_clone = call_count.clone();
3705        
3706        prop.subscribe(Arc::new(move |_, _| {
3707            count_clone.fetch_add(1, Ordering::SeqCst);
3708        })).expect("Failed to subscribe");
3709        
3710        // Empty batch should succeed without calling observers
3711        prop.notify_observers_batch(vec![]).expect("Failed with empty batch");
3712        
3713        assert_eq!(call_count.load(Ordering::SeqCst), 0);
3714    }
3715}