observable_property/
lib.rs

1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Graceful lock recovery**: Continues operation even after lock poisoning from panics
15//! - **Memory protection**: Observer limit (10,000) prevents memory exhaustion
16//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
17//!
18//! ## Quick Start
19//!
20//! ```rust
21//! use observable_property::ObservableProperty;
22//! use std::sync::Arc;
23//!
24//! // Create an observable property
25//! let property = ObservableProperty::new(42);
26//!
27//! // Subscribe to changes
28//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
29//!     println!("Value changed from {} to {}", old_value, new_value);
30//! })).map_err(|e| {
31//!     eprintln!("Failed to subscribe: {}", e);
32//!     e
33//! })?;
34//!
35//! // Change the value (triggers observer)
36//! property.set(100).map_err(|e| {
37//!     eprintln!("Failed to set value: {}", e);
38//!     e
39//! })?;
40//!
41//! // Unsubscribe when done
42//! property.unsubscribe(observer_id).map_err(|e| {
43//!     eprintln!("Failed to unsubscribe: {}", e);
44//!     e
45//! })?;
46//! # Ok::<(), observable_property::PropertyError>(())
47//! ```
48//!
49//! ## Multi-threading Example
50//!
51//! ```rust
52//! use observable_property::ObservableProperty;
53//! use std::sync::Arc;
54//! use std::thread;
55//!
56//! let property = Arc::new(ObservableProperty::new(0));
57//! let property_clone = property.clone();
58//!
59//! // Subscribe from one thread
60//! property.subscribe(Arc::new(|old, new| {
61//!     println!("Value changed: {} -> {}", old, new);
62//! })).map_err(|e| {
63//!     eprintln!("Failed to subscribe: {}", e);
64//!     e
65//! })?;
66//!
67//! // Modify from another thread
68//! thread::spawn(move || {
69//!     if let Err(e) = property_clone.set(42) {
70//!         eprintln!("Failed to set value: {}", e);
71//!     }
72//! }).join().expect("Thread panicked");
73//! # Ok::<(), observable_property::PropertyError>(())
74//! ```
75//!
76//! ## RAII Subscriptions (Recommended)
77//!
78//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
79//!
80//! ```rust
81//! use observable_property::ObservableProperty;
82//! use std::sync::Arc;
83//!
84//! # fn main() -> Result<(), observable_property::PropertyError> {
85//! let property = ObservableProperty::new(0);
86//!
87//! {
88//!     // Create RAII subscription - automatically cleaned up when dropped
89//!     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
90//!         println!("Value changed: {} -> {}", old, new);
91//!     }))?;
92//!
93//!     property.set(42)?; // Prints: "Value changed: 0 -> 42"
94//!
95//!     // Subscription automatically unsubscribes when leaving this scope
96//! }
97//!
98//! // No observer active anymore
99//! property.set(100)?; // No output
100//! # Ok(())
101//! # }
102//! ```
103//!
104//! ## Filtered RAII Subscriptions
105//!
106//! Combine filtering with automatic cleanup for conditional monitoring:
107//!
108//! ```rust
109//! use observable_property::ObservableProperty;
110//! use std::sync::Arc;
111//!
112//! # fn main() -> Result<(), observable_property::PropertyError> {
113//! let temperature = ObservableProperty::new(20.0);
114//!
115//! {
116//!     // Monitor only significant temperature increases with automatic cleanup
117//!     let _heat_warning = temperature.subscribe_filtered_with_subscription(
118//!         Arc::new(|old, new| {
119//!             println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
120//!         }),
121//!         |old, new| new > old && (new - old) > 5.0
122//!     )?;
123//!
124//!     temperature.set(22.0)?; // No warning (only 2°C increase)
125//!     temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
126//!
127//!     // Subscription automatically cleaned up here
128//! }
129//!
130//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
131//! # Ok(())
132//! # }
133//! ```
134//!
135//! ## Subscription Management Comparison
136//!
137//! ```rust
138//! use observable_property::ObservableProperty;
139//! use std::sync::Arc;
140//!
141//! # fn main() -> Result<(), observable_property::PropertyError> {
142//! let property = ObservableProperty::new(0);
143//! let observer = Arc::new(|old: &i32, new: &i32| {
144//!     println!("Value: {} -> {}", old, new);
145//! });
146//!
147//! // Method 1: Manual subscription management
148//! let observer_id = property.subscribe(observer.clone())?;
149//! property.set(42)?;
150//! property.unsubscribe(observer_id)?; // Manual cleanup required
151//!
152//! // Method 2: RAII subscription management (recommended)
153//! {
154//!     let _subscription = property.subscribe_with_subscription(observer)?;
155//!     property.set(100)?;
156//!     // Automatic cleanup when _subscription goes out of scope
157//! }
158//! # Ok(())
159//! # }
160//! ```
161//!
162//! ## Advanced RAII Patterns
163//!
164//! Comprehensive example showing various RAII subscription patterns:
165//!
166//! ```rust
167//! use observable_property::ObservableProperty;
168//! use std::sync::Arc;
169//!
170//! # fn main() -> Result<(), observable_property::PropertyError> {
171//! // System monitoring example
172//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
173//! let memory_usage = ObservableProperty::new(1024); // MB
174//! let active_connections = ObservableProperty::new(0u32);
175//!
176//! // Conditional monitoring based on system state
177//! let high_load_monitoring = cpu_usage.get()? > 50.0;
178//!
179//! if high_load_monitoring {
180//!     // Critical system monitoring - active only during high load
181//!     let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
182//!         Arc::new(|old, new| {
183//!             println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
184//!         }),
185//!         |_, new| *new > 80.0
186//!     )?;
187//!
188//!     let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
189//!         Arc::new(|old, new| {
190//!             println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
191//!         }),
192//!         |_, new| *new > 8192 // > 8GB
193//!     )?;
194//!
195//!     // Simulate system load changes
196//!     cpu_usage.set(85.0)?;     // Would trigger critical alert
197//!     memory_usage.set(9216)?;  // Would trigger memory warning
198//!     
199//!     // All monitoring automatically stops when exiting this block
200//! }
201//!
202//! // Connection monitoring with scoped lifetime
203//! {
204//!     let _connection_monitor = active_connections.subscribe_with_subscription(
205//!         Arc::new(|old, new| {
206//!             if new > old {
207//!                 println!("📈 New connections: {} -> {}", old, new);
208//!             } else if new < old {
209//!                 println!("📉 Connections closed: {} -> {}", old, new);
210//!             }
211//!         })
212//!     )?;
213//!
214//!     active_connections.set(5)?;  // Prints: "📈 New connections: 0 -> 5"
215//!     active_connections.set(3)?;  // Prints: "📉 Connections closed: 5 -> 3"
216//!     active_connections.set(8)?;  // Prints: "📈 New connections: 3 -> 8"
217//!
218//!     // Connection monitoring automatically stops here
219//! }
220//!
221//! // No monitoring active anymore
222//! cpu_usage.set(95.0)?;         // No output
223//! memory_usage.set(10240)?;     // No output  
224//! active_connections.set(15)?;  // No output
225//!
226//! println!("All monitoring automatically cleaned up!");
227//! # Ok(())
228//! # }
229//! ```
230
231use std::collections::HashMap;
232use std::fmt;
233use std::mem;
234use std::panic;
235use std::sync::{Arc, RwLock};
236use std::thread;
237
238/// Maximum number of background threads used for asynchronous observer notifications
239///
240/// This constant controls the degree of parallelism when using `set_async()` to notify
241/// observers. The observer list is divided into batches, with each batch running in
242/// its own background thread, up to this maximum number of threads.
243///
244/// # Rationale
245///
246/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
247/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead  
248/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
249/// - **System Responsiveness**: Limits thread contention on multi-core systems
250///
251/// # Implementation Details
252///
253/// When `set_async()` is called:
254/// 1. All observers are collected into a snapshot
255/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
256/// 3. Each batch executes in its own `thread::spawn()` call
257/// 4. Observers within each batch are executed sequentially
258///
259/// For example, with 100 observers and `MAX_THREADS = 4`:
260/// - Batch 1: Observers 1-25 (Thread 1)
261/// - Batch 2: Observers 26-50 (Thread 2)  
262/// - Batch 3: Observers 51-75 (Thread 3)
263/// - Batch 4: Observers 76-100 (Thread 4)
264///
265/// # Tuning Considerations
266///
267/// This value can be adjusted based on your application's needs:
268/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
269/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
270/// - **Memory-constrained systems**: Lower values reduce thread overhead
271/// - **Real-time systems**: Lower values reduce scheduling unpredictability
272///
273/// # Thread Safety
274///
275/// This constant is used only during the batching calculation and does not affect
276/// the thread safety of the overall system.
277const MAX_THREADS: usize = 4;
278
279/// Maximum number of observers allowed per property instance
280///
281/// This limit prevents memory exhaustion from unbounded observer registration.
282/// Once this limit is reached, attempts to add more observers will fail with
283/// an `InvalidConfiguration` error.
284///
285/// # Rationale
286///
287/// - **Memory Protection**: Prevents unbounded memory growth from observer accumulation
288/// - **Resource Management**: Ensures predictable memory usage in long-running applications
289/// - **Early Detection**: Catches potential memory leaks from forgotten unsubscriptions
290/// - **System Stability**: Prevents out-of-memory conditions in constrained environments
291///
292/// # Tuning Considerations
293///
294/// This value can be adjusted based on your application's needs:
295/// - **High-frequency properties**: May need fewer observers to avoid notification overhead
296/// - **Low-frequency properties**: Can safely support more observers
297/// - **Memory-constrained systems**: Lower values prevent memory pressure
298/// - **Development/testing**: Higher values may be useful for comprehensive test coverage
299///
300/// # Default Value
301///
302/// The default of 10,000 observers provides generous headroom for most applications while
303/// still preventing pathological cases. In practice, most properties have fewer than 100
304/// observers.
305///
306/// # Example Scenarios
307///
308/// - **User sessions**: 1,000 concurrent users, each with 5 properties = ~5,000 observers
309/// - **IoT devices**: 5,000 devices, each with 1-2 observers = ~10,000 observers
310/// - **Monitoring system**: 100 metrics, each with 20 dashboard widgets = ~2,000 observers
311const MAX_OBSERVERS: usize = 10_000;
312
313/// Errors that can occur when working with ObservableProperty
314///
315/// # Note on Lock Poisoning
316///
317/// This implementation uses **graceful degradation** for poisoned locks. When a lock
318/// is poisoned (typically due to a panic in an observer or another thread), the
319/// library automatically recovers the inner value using [`PoisonError::into_inner()`](std::sync::PoisonError::into_inner).
320///
321/// This means:
322/// - **All operations continue to work** even after a lock is poisoned
323/// - No `ReadLockError`, `WriteLockError`, or `PoisonedLock` errors will occur in practice
324/// - The system remains operational and observers continue to function
325///
326/// The error variants are kept for backward compatibility and potential future use cases,
327/// but with the current implementation, poisoned locks are transparent to users.
328///
329/// # Production Benefit
330///
331/// This approach ensures maximum availability and resilience in production systems where
332/// a misbehaving observer shouldn't bring down the entire property system.
333#[derive(Debug, Clone)]
334pub enum PropertyError {
335    /// Failed to acquire a read lock on the property
336    ///
337    /// **Note**: With graceful degradation, this error is unlikely to occur in practice
338    /// as poisoned locks are automatically recovered.
339    ReadLockError {
340        /// Context describing what operation was being attempted
341        context: String,
342    },
343    /// Failed to acquire a write lock on the property
344    ///
345    /// **Note**: With graceful degradation, this error is unlikely to occur in practice
346    /// as poisoned locks are automatically recovered.
347    WriteLockError {
348        /// Context describing what operation was being attempted
349        context: String,
350    },
351    /// Attempted to unsubscribe an observer that doesn't exist
352    ObserverNotFound {
353        /// The ID of the observer that wasn't found
354        id: usize,
355    },
356    /// The property's lock has been poisoned due to a panic in another thread
357    ///
358    /// **Note**: With graceful degradation, this error will not occur in practice
359    /// as the implementation automatically recovers from poisoned locks.
360    PoisonedLock,
361    /// An observer function encountered an error during execution
362    ObserverError {
363        /// Description of what went wrong
364        reason: String,
365    },
366    /// The thread pool for async notifications is exhausted
367    ThreadPoolExhausted,
368    /// Invalid configuration was provided
369    InvalidConfiguration {
370        /// Description of the invalid configuration
371        reason: String,
372    },
373}
374
375impl fmt::Display for PropertyError {
376    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377        match self {
378            PropertyError::ReadLockError { context } => {
379                write!(f, "Failed to acquire read lock: {}", context)
380            }
381            PropertyError::WriteLockError { context } => {
382                write!(f, "Failed to acquire write lock: {}", context)
383            }
384            PropertyError::ObserverNotFound { id } => {
385                write!(f, "Observer with ID {} not found", id)
386            }
387            PropertyError::PoisonedLock => {
388                write!(
389                    f,
390                    "Property is in a poisoned state due to a panic in another thread"
391                )
392            }
393            PropertyError::ObserverError { reason } => {
394                write!(f, "Observer execution failed: {}", reason)
395            }
396            PropertyError::ThreadPoolExhausted => {
397                write!(f, "Thread pool is exhausted and cannot spawn more observers")
398            }
399            PropertyError::InvalidConfiguration { reason } => {
400                write!(f, "Invalid configuration: {}", reason)
401            }
402        }
403    }
404}
405
406impl std::error::Error for PropertyError {}
407
408/// Function type for observers that get called when property values change
409pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
410
411/// Unique identifier for registered observers
412pub type ObserverId = usize;
413
414/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
415///
416/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
417/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
418/// `Drop` implementation automatically removes the associated observer from the property.
419///
420/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
421/// leaks in scenarios where observers might otherwise be forgotten.
422///
423/// # Type Requirements
424///
425/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
426/// - `Clone`: Required for observer notifications
427/// - `Send`: Required for transferring between threads  
428/// - `Sync`: Required for concurrent access from multiple threads
429/// - `'static`: Required for observer callbacks that may outlive the original scope
430///
431/// # Examples
432///
433/// ## Basic RAII Subscription
434///
435/// ```rust
436/// use observable_property::ObservableProperty;
437/// use std::sync::Arc;
438///
439/// # fn main() -> Result<(), observable_property::PropertyError> {
440/// let property = ObservableProperty::new(0);
441///
442/// {
443///     // Create subscription - observer is automatically registered
444///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
445///         println!("Value changed: {} -> {}", old, new);
446///     }))?;
447///
448///     property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
449///
450///     // When _subscription goes out of scope here, observer is automatically removed
451/// }
452///
453/// property.set(100)?; // No observer output - subscription was automatically cleaned up
454/// # Ok(())
455/// # }
456/// ```
457///
458/// ## Cross-Thread Subscription Management
459///
460/// ```rust
461/// use observable_property::ObservableProperty;
462/// use std::sync::Arc;
463/// use std::thread;
464///
465/// # fn main() -> Result<(), observable_property::PropertyError> {
466/// let property = Arc::new(ObservableProperty::new(0));
467/// let property_clone = property.clone();
468///
469/// // Create subscription in main thread
470/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
471///     println!("Observed: {} -> {}", old, new);
472/// }))?;
473///
474/// // Move subscription to another thread for cleanup
475/// let handle = thread::spawn(move || {
476///     // Subscription is still active here
477///     let _ = property_clone.set(42); // Will trigger observer
478///     
479///     // When subscription is dropped here (end of thread), observer is cleaned up
480///     drop(subscription);
481/// });
482///
483/// handle.join().unwrap();
484/// 
485/// // Observer is no longer active
486/// property.set(100)?; // No output
487/// # Ok(())
488/// # }
489/// ```
490///
491/// ## Conditional Scoped Subscriptions
492///
493/// ```rust
494/// use observable_property::ObservableProperty;
495/// use std::sync::Arc;
496///
497/// # fn main() -> Result<(), observable_property::PropertyError> {
498/// let counter = ObservableProperty::new(0);
499/// let debug_mode = true;
500///
501/// if debug_mode {
502///     let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
503///         println!("Debug: counter {} -> {}", old, new);
504///     }))?;
505///     
506///     counter.set(1)?; // Prints debug info
507///     counter.set(2)?; // Prints debug info
508///     
509///     // Debug subscription automatically cleaned up when exiting if block
510/// }
511///
512/// counter.set(3)?; // No debug output (subscription was cleaned up)
513/// # Ok(())
514/// # }
515/// ```
516///
517/// # Thread Safety
518///
519/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
520/// sent between threads and the automatic cleanup will work correctly even if the
521/// subscription is dropped from a different thread than where it was created.
522pub struct Subscription<T: Clone + Send + Sync + 'static> {
523    inner: Arc<RwLock<InnerProperty<T>>>,
524    id: ObserverId,
525}
526
527impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
528    /// Debug implementation that shows the subscription ID without exposing internals
529    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
530        f.debug_struct("Subscription")
531            .field("id", &self.id)
532            .field("inner", &"[ObservableProperty]")
533            .finish()
534    }
535}
536
537impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
538    /// Automatically removes the associated observer when the subscription is dropped
539    ///
540    /// This implementation provides automatic cleanup by removing the observer
541    /// from the property's observer list when the `Subscription` goes out of scope.
542    ///
543    /// # Error Handling
544    ///
545    /// If the property's lock is poisoned or inaccessible during cleanup, the error
546    /// is silently ignored using the `let _ = ...` pattern. This is intentional
547    /// because:
548    /// 1. Drop implementations should not panic
549    /// 2. If the property is poisoned, it's likely unusable anyway
550    /// 3. There's no meaningful way to handle cleanup errors in a destructor
551    ///
552    /// # Thread Safety
553    ///
554    /// This method is safe to call from any thread, even if the subscription
555    /// was created on a different thread.
556    fn drop(&mut self) {
557        // Graceful degradation: always attempt to clean up, even from poisoned locks
558        match self.inner.write() {
559            Ok(mut guard) => {
560                guard.observers.remove(&self.id);
561            }
562            Err(poisoned) => {
563                // Recover from poisoned lock to ensure cleanup happens
564                let mut guard = poisoned.into_inner();
565                guard.observers.remove(&self.id);
566            }
567        }
568    }
569}
570
571/// A thread-safe observable property that notifies observers when its value changes
572///
573/// This type wraps a value of type `T` and allows multiple observers to be notified
574/// whenever the value is modified. All operations are thread-safe and can be called
575/// from multiple threads concurrently.
576///
577/// # Type Requirements
578///
579/// The generic type `T` must implement:
580/// - `Clone`: Required for returning values and passing them to observers
581/// - `Send`: Required for transferring between threads
582/// - `Sync`: Required for concurrent access from multiple threads  
583/// - `'static`: Required for observer callbacks that may outlive the original scope
584///
585/// # Examples
586///
587/// ```rust
588/// use observable_property::ObservableProperty;
589/// use std::sync::Arc;
590///
591/// let property = ObservableProperty::new("initial".to_string());
592///
593/// let observer_id = property.subscribe(Arc::new(|old, new| {
594///     println!("Changed from '{}' to '{}'", old, new);
595/// })).map_err(|e| {
596///     eprintln!("Failed to subscribe: {}", e);
597///     e
598/// })?;
599///
600/// property.set("updated".to_string()).map_err(|e| {
601///     eprintln!("Failed to set value: {}", e);
602///     e
603/// })?; // Prints: Changed from 'initial' to 'updated'
604///
605/// property.unsubscribe(observer_id).map_err(|e| {
606///     eprintln!("Failed to unsubscribe: {}", e);
607///     e
608/// })?;
609/// # Ok::<(), observable_property::PropertyError>(())
610/// ```
611pub struct ObservableProperty<T> {
612    inner: Arc<RwLock<InnerProperty<T>>>,
613    max_threads: usize,
614    max_observers: usize,
615}
616
617struct InnerProperty<T> {
618    value: T,
619    observers: HashMap<ObserverId, Observer<T>>,
620    next_id: ObserverId,
621}
622
623impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
624    /// Creates a new observable property with the given initial value
625    ///
626    /// # Arguments
627    ///
628    /// * `initial_value` - The starting value for this property
629    ///
630    /// # Examples
631    ///
632    /// ```rust
633    /// use observable_property::ObservableProperty;
634    ///
635    /// let property = ObservableProperty::new(42);
636    /// match property.get() {
637    ///     Ok(value) => assert_eq!(value, 42),
638    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
639    /// }
640    /// ```
641    pub fn new(initial_value: T) -> Self {
642        Self {
643            inner: Arc::new(RwLock::new(InnerProperty {
644                value: initial_value,
645                observers: HashMap::new(),
646                next_id: 0,
647            })),
648            max_threads: MAX_THREADS,
649            max_observers: MAX_OBSERVERS,
650        }
651    }
652
653    /// Creates a new observable property with a custom maximum thread count for async notifications
654    ///
655    /// This constructor allows you to customize the maximum number of threads used for
656    /// asynchronous observer notifications via `set_async()`. This is useful for tuning
657    /// performance based on your specific use case and system constraints.
658    ///
659    /// # Arguments
660    ///
661    /// * `initial_value` - The starting value for this property
662    /// * `max_threads` - Maximum number of threads to use for async notifications.
663    ///   If 0 is provided, defaults to 4.
664    ///
665    /// # Thread Pool Behavior
666    ///
667    /// When `set_async()` is called, observers are divided into batches and each batch
668    /// runs in its own thread, up to the specified maximum. For example:
669    /// - With 100 observers and `max_threads = 4`: 4 threads with ~25 observers each
670    /// - With 10 observers and `max_threads = 8`: 10 threads with 1 observer each
671    /// - With 2 observers and `max_threads = 4`: 2 threads with 1 observer each
672    ///
673    /// # Use Cases
674    ///
675    /// ## High-Throughput Systems
676    /// ```rust
677    /// use observable_property::ObservableProperty;
678    ///
679    /// // For systems with many CPU cores and CPU-bound observers
680    /// let property = ObservableProperty::with_max_threads(0, 8);
681    /// ```
682    ///
683    /// ## Resource-Constrained Systems
684    /// ```rust
685    /// use observable_property::ObservableProperty;
686    ///
687    /// // For embedded systems or memory-constrained environments
688    /// let property = ObservableProperty::with_max_threads(42, 1);
689    /// ```
690    ///
691    /// ## I/O-Heavy Observers
692    /// ```rust
693    /// use observable_property::ObservableProperty;
694    ///
695    /// // For observers that do network/database operations
696    /// let property = ObservableProperty::with_max_threads("data".to_string(), 16);
697    /// ```
698    ///
699    /// # Performance Considerations
700    ///
701    /// - **Higher values**: Better parallelism but more thread overhead and memory usage
702    /// - **Lower values**: Less overhead but potentially slower async notifications
703    /// - **Optimal range**: Typically between 1 and 2x the number of CPU cores
704    /// - **Zero value**: Automatically uses the default value (4)
705    ///
706    /// # Thread Safety
707    ///
708    /// This setting only affects async notifications (`set_async()`). Synchronous
709    /// operations (`set()`) always execute observers sequentially regardless of this setting.
710    ///
711    /// # Examples
712    ///
713    /// ```rust
714    /// use observable_property::ObservableProperty;
715    /// use std::sync::Arc;
716    ///
717    /// // Create property with custom thread pool size
718    /// let property = ObservableProperty::with_max_threads(42, 2);
719    ///
720    /// // Subscribe observers as usual
721    /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
722    ///     println!("Value changed: {} -> {}", old, new);
723    /// })).expect("Failed to create subscription");
724    ///
725    /// // Async notifications will use at most 2 threads
726    /// property.set_async(100).expect("Failed to set value asynchronously");
727    /// ```
728    pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
729        let max_threads = if max_threads == 0 {
730            MAX_THREADS
731        } else {
732            max_threads
733        };
734        Self {
735            inner: Arc::new(RwLock::new(InnerProperty {
736                value: initial_value,
737                observers: HashMap::new(),
738                next_id: 0,
739            })),
740            max_threads,
741            max_observers: MAX_OBSERVERS,
742        }
743    }
744
745    /// Gets the current value of the property
746    ///
747    /// This method acquires a read lock, which allows multiple concurrent readers
748    /// but will block if a writer currently holds the lock.
749    ///
750    /// # Returns
751    ///
752    /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
753    /// if the lock is poisoned.
754    ///
755    /// # Examples
756    ///
757    /// ```rust
758    /// use observable_property::ObservableProperty;
759    ///
760    /// let property = ObservableProperty::new("hello".to_string());
761    /// match property.get() {
762    ///     Ok(value) => assert_eq!(value, "hello"),
763    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
764    /// }
765    /// ```
766    pub fn get(&self) -> Result<T, PropertyError> {
767        match self.inner.read() {
768            Ok(prop) => Ok(prop.value.clone()),
769            Err(poisoned) => {
770                // Graceful degradation: recover value from poisoned lock
771                // This allows continued operation even after a panic in another thread
772                Ok(poisoned.into_inner().value.clone())
773            }
774        }
775    }
776
777    /// Sets the property to a new value and notifies all observers
778    ///
779    /// This method will:
780    /// 1. Acquire a write lock (blocking other readers/writers)
781    /// 2. Update the value and capture a snapshot of observers
782    /// 3. Release the lock
783    /// 4. Notify all observers sequentially with the old and new values
784    ///
785    /// Observer notifications are wrapped in panic recovery to prevent one
786    /// misbehaving observer from affecting others.
787    ///
788    /// # Arguments
789    ///
790    /// * `new_value` - The new value to set
791    ///
792    /// # Returns
793    ///
794    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
795    ///
796    /// # Examples
797    ///
798    /// ```rust
799    /// use observable_property::ObservableProperty;
800    /// use std::sync::Arc;
801    ///
802    /// let property = ObservableProperty::new(10);
803    ///
804    /// property.subscribe(Arc::new(|old, new| {
805    ///     println!("Value changed from {} to {}", old, new);
806    /// })).map_err(|e| {
807    ///     eprintln!("Failed to subscribe: {}", e);
808    ///     e
809    /// })?;
810    ///
811    /// property.set(20).map_err(|e| {
812    ///     eprintln!("Failed to set property value: {}", e);
813    ///     e
814    /// })?; // Triggers observer notification
815    /// # Ok::<(), observable_property::PropertyError>(())
816    /// ```
817    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
818        let (old_value, observers_snapshot) = {
819            let mut prop = match self.inner.write() {
820                Ok(guard) => guard,
821                Err(poisoned) => {
822                    // Graceful degradation: recover from poisoned write lock
823                    // Clear the poison flag by taking ownership of the inner value
824                    poisoned.into_inner()
825                }
826            };
827
828            // Performance optimization: use mem::replace to avoid one clone operation
829            let old_value = mem::replace(&mut prop.value, new_value.clone());
830            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
831            (old_value, observers_snapshot)
832        };
833
834        for observer in observers_snapshot {
835            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
836                observer(&old_value, &new_value);
837            })) {
838                eprintln!("Observer panic: {:?}", e);
839            }
840        }
841
842        Ok(())
843    }
844
845    /// Sets the property to a new value and notifies observers asynchronously
846    ///
847    /// This method is similar to `set()` but spawns observers in background threads
848    /// for non-blocking operation. This is useful when observers might perform
849    /// time-consuming operations.
850    ///
851    /// Observers are batched into groups and each batch runs in its own thread
852    /// to limit resource usage while still providing parallelism.
853    ///
854    /// # Thread Management (Fire-and-Forget Pattern)
855    ///
856    /// **Important**: This method uses a fire-and-forget pattern. Spawned threads are
857    /// **not joined** and run independently in the background. This design is intentional
858    /// for non-blocking behavior but has important implications:
859    ///
860    /// ## Characteristics:
861    /// - ✅ **Non-blocking**: Returns immediately without waiting for observers
862    /// - ✅ **High performance**: No synchronization overhead
863    /// - ⚠️ **No completion guarantee**: Thread may still be running when method returns
864    /// - ⚠️ **No error propagation**: Observer errors are logged but not returned
865    /// - ⚠️ **Testing caveat**: May need explicit delays to observe side effects
866    /// - ⚠️ **Ordering caveat**: Multiple rapid `set_async()` calls may result in observers
867    ///   receiving notifications out of order due to thread scheduling. Use `set()` if
868    ///   sequential ordering is critical.
869    ///
870    /// ## Use Cases:
871    /// - **UI updates**: Fire updates without blocking the main thread
872    /// - **Logging**: Asynchronous logging that doesn't block operations
873    /// - **Metrics**: Non-critical telemetry that can be lost
874    /// - **Notifications**: Fire-and-forget alerts or messages
875    ///
876    /// ## When NOT to Use:
877    /// - **Critical operations**: Use `set()` if you need guarantees
878    /// - **Transactional updates**: Use `set()` for atomic operations
879    /// - **Sequential dependencies**: If next operation depends on observer completion
880    ///
881    /// ## Testing Considerations:
882    /// ```rust
883    /// use observable_property::ObservableProperty;
884    /// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
885    /// use std::time::Duration;
886    ///
887    /// # fn main() -> Result<(), observable_property::PropertyError> {
888    /// let property = ObservableProperty::new(0);
889    /// let was_called = Arc::new(AtomicBool::new(false));
890    /// let flag = was_called.clone();
891    ///
892    /// property.subscribe(Arc::new(move |_, _| {
893    ///     flag.store(true, Ordering::SeqCst);
894    /// }))?;
895    ///
896    /// property.set_async(42)?;
897    ///
898    /// // ⚠️ Immediate check might fail - thread may not have run yet
899    /// // assert!(was_called.load(Ordering::SeqCst)); // May fail!
900    ///
901    /// // ✅ Add a small delay to allow background thread to complete
902    /// std::thread::sleep(Duration::from_millis(10));
903    /// assert!(was_called.load(Ordering::SeqCst)); // Now reliable
904    /// # Ok(())
905    /// # }
906    /// ```
907    ///
908    /// # Arguments
909    ///
910    /// * `new_value` - The new value to set
911    ///
912    /// # Returns
913    ///
914    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
915    /// Note that this only indicates the property was updated successfully;
916    /// observer execution happens asynchronously and errors are not returned.
917    ///
918    /// # Examples
919    ///
920    /// ## Basic Usage
921    ///
922    /// ```rust
923    /// use observable_property::ObservableProperty;
924    /// use std::sync::Arc;
925    /// use std::time::Duration;
926    ///
927    /// let property = ObservableProperty::new(0);
928    ///
929    /// property.subscribe(Arc::new(|old, new| {
930    ///     // This observer does slow work but won't block the caller
931    ///     std::thread::sleep(Duration::from_millis(100));
932    ///     println!("Slow observer: {} -> {}", old, new);
933    /// })).map_err(|e| {
934    ///     eprintln!("Failed to subscribe: {}", e);
935    ///     e
936    /// })?;
937    ///
938    /// // This returns immediately even though observer is slow
939    /// property.set_async(42).map_err(|e| {
940    ///     eprintln!("Failed to set value asynchronously: {}", e);
941    ///     e
942    /// })?;
943    ///
944    /// // Continue working immediately - observer runs in background
945    /// println!("Main thread continues without waiting");
946    /// # Ok::<(), observable_property::PropertyError>(())
947    /// ```
948    ///
949    /// ## Multiple Rapid Updates
950    ///
951    /// ```rust
952    /// use observable_property::ObservableProperty;
953    /// use std::sync::Arc;
954    ///
955    /// # fn main() -> Result<(), observable_property::PropertyError> {
956    /// let property = ObservableProperty::new(0);
957    ///
958    /// property.subscribe(Arc::new(|old, new| {
959    ///     // Expensive operation (e.g., database update, API call)
960    ///     println!("Processing: {} -> {}", old, new);
961    /// }))?;
962    ///
963    /// // All of these return immediately - observers run in parallel
964    /// property.set_async(1)?;
965    /// property.set_async(2)?;
966    /// property.set_async(3)?;
967    /// property.set_async(4)?;
968    /// property.set_async(5)?;
969    ///
970    /// // All observer calls are now running in background threads
971    /// # Ok(())
972    /// # }
973    /// ```
974    pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
975        let (old_value, observers_snapshot) = {
976            let mut prop = match self.inner.write() {
977                Ok(guard) => guard,
978                Err(poisoned) => {
979                    // Graceful degradation: recover from poisoned write lock
980                    poisoned.into_inner()
981                }
982            };
983
984            // Performance optimization: use mem::replace to avoid one clone operation
985            let old_value = mem::replace(&mut prop.value, new_value.clone());
986            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
987            (old_value, observers_snapshot)
988        };
989
990        if observers_snapshot.is_empty() {
991            return Ok(());
992        }
993
994        let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
995
996        // Fire-and-forget pattern: Spawn threads without joining
997        // This is intentional for non-blocking behavior. Observers run independently
998        // and the caller continues immediately without waiting for completion.
999        // Trade-offs:
1000        //   ✅ Non-blocking, high performance
1001        //   ⚠️ No completion guarantee, no error propagation to caller
1002        for batch in observers_snapshot.chunks(observers_per_thread) {
1003            let batch_observers = batch.to_vec();
1004            let old_val = old_value.clone();
1005            let new_val = new_value.clone();
1006
1007            thread::spawn(move || {
1008                for observer in batch_observers {
1009                    if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1010                        observer(&old_val, &new_val);
1011                    })) {
1012                        eprintln!("Observer panic in batch thread: {:?}", e);
1013                    }
1014                }
1015            });
1016            // Thread handle intentionally dropped - fire-and-forget pattern
1017        }
1018
1019        Ok(())
1020    }
1021
1022    /// Subscribes an observer function to be called when the property changes
1023    ///
1024    /// The observer function will be called with the old and new values whenever
1025    /// the property is modified via `set()` or `set_async()`.
1026    ///
1027    /// # Arguments
1028    ///
1029    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1030    ///
1031    /// # Returns
1032    ///
1033    /// `Ok(ObserverId)` containing a unique identifier for this observer,
1034    /// or `Err(PropertyError::InvalidConfiguration)` if the maximum observer limit is exceeded.
1035    ///
1036    /// # Observer Limit
1037    ///
1038    /// To prevent memory exhaustion, there is a maximum limit of observers per property
1039    /// (currently set to 10,000). If you attempt to add more observers than this limit,
1040    /// the subscription will fail with an `InvalidConfiguration` error.
1041    ///
1042    /// This protection helps prevent:
1043    /// - Memory leaks from forgotten unsubscriptions
1044    /// - Unbounded memory growth in long-running applications
1045    /// - Out-of-memory conditions in resource-constrained environments
1046    ///
1047    /// # Examples
1048    ///
1049    /// ```rust
1050    /// use observable_property::ObservableProperty;
1051    /// use std::sync::Arc;
1052    ///
1053    /// let property = ObservableProperty::new(0);
1054    ///
1055    /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
1056    ///     println!("Property changed from {} to {}", old_value, new_value);
1057    /// })).map_err(|e| {
1058    ///     eprintln!("Failed to subscribe observer: {}", e);
1059    ///     e
1060    /// })?;
1061    ///
1062    /// // Later, unsubscribe using the returned ID
1063    /// property.unsubscribe(observer_id).map_err(|e| {
1064    ///     eprintln!("Failed to unsubscribe observer: {}", e);
1065    ///     e
1066    /// })?;
1067    /// # Ok::<(), observable_property::PropertyError>(())
1068    /// ```
1069    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
1070        let mut prop = match self.inner.write() {
1071            Ok(guard) => guard,
1072            Err(poisoned) => {
1073                // Graceful degradation: recover from poisoned write lock
1074                poisoned.into_inner()
1075            }
1076        };
1077
1078        // Check observer limit to prevent memory exhaustion
1079        if prop.observers.len() >= self.max_observers {
1080            return Err(PropertyError::InvalidConfiguration {
1081                reason: format!(
1082                    "Maximum observer limit ({}) exceeded. Current observers: {}. \
1083                     Consider unsubscribing unused observers to free resources.",
1084                    self.max_observers,
1085                    prop.observers.len()
1086                ),
1087            });
1088        }
1089
1090        let id = prop.next_id;
1091        // Use wrapping_add to prevent overflow panics in production
1092        // After ~usize::MAX subscriptions, IDs will wrap around
1093        // This is acceptable as old observers are typically unsubscribed
1094        prop.next_id = prop.next_id.wrapping_add(1);
1095        prop.observers.insert(id, observer);
1096        Ok(id)
1097    }
1098
1099    /// Removes an observer identified by its ID
1100    ///
1101    /// # Arguments
1102    ///
1103    /// * `id` - The observer ID returned by `subscribe()`
1104    ///
1105    /// # Returns
1106    ///
1107    /// `Ok(bool)` where `true` means the observer was found and removed,
1108    /// `false` means no observer with that ID existed.
1109    /// Returns `Err(PropertyError)` if the lock is poisoned.
1110    ///
1111    /// # Examples
1112    ///
1113    /// ```rust
1114    /// use observable_property::ObservableProperty;
1115    /// use std::sync::Arc;
1116    ///
1117    /// let property = ObservableProperty::new(0);
1118    /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
1119    ///     eprintln!("Failed to subscribe: {}", e);
1120    ///     e
1121    /// })?;
1122    ///
1123    /// let was_removed = property.unsubscribe(id).map_err(|e| {
1124    ///     eprintln!("Failed to unsubscribe: {}", e);
1125    ///     e
1126    /// })?;
1127    /// assert!(was_removed); // Observer existed and was removed
1128    ///
1129    /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
1130    ///     eprintln!("Failed to unsubscribe again: {}", e);
1131    ///     e
1132    /// })?;
1133    /// assert!(!was_removed_again); // Observer no longer exists
1134    /// # Ok::<(), observable_property::PropertyError>(())
1135    /// ```
1136    pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
1137        let mut prop = match self.inner.write() {
1138            Ok(guard) => guard,
1139            Err(poisoned) => {
1140                // Graceful degradation: recover from poisoned write lock
1141                poisoned.into_inner()
1142            }
1143        };
1144
1145        let was_present = prop.observers.remove(&id).is_some();
1146        Ok(was_present)
1147    }
1148
1149    /// Subscribes an observer that only gets called when a filter condition is met
1150    ///
1151    /// This is useful for observing only specific types of changes, such as
1152    /// when a value increases or crosses a threshold.
1153    ///
1154    /// # Arguments
1155    ///
1156    /// * `observer` - The observer function to call when the filter passes
1157    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1158    ///
1159    /// # Returns
1160    ///
1161    /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
1162    ///
1163    /// # Examples
1164    ///
1165    /// ```rust
1166    /// use observable_property::ObservableProperty;
1167    /// use std::sync::Arc;
1168    ///
1169    /// let property = ObservableProperty::new(0);
1170    ///
1171    /// // Only notify when value increases
1172    /// let id = property.subscribe_filtered(
1173    ///     Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
1174    ///     |old, new| new > old
1175    /// ).map_err(|e| {
1176    ///     eprintln!("Failed to subscribe filtered observer: {}", e);
1177    ///     e
1178    /// })?;
1179    ///
1180    /// property.set(10).map_err(|e| {
1181    ///     eprintln!("Failed to set value: {}", e);
1182    ///     e
1183    /// })?; // Triggers observer (0 -> 10)
1184    /// property.set(5).map_err(|e| {
1185    ///     eprintln!("Failed to set value: {}", e);
1186    ///     e
1187    /// })?;  // Does NOT trigger observer (10 -> 5)
1188    /// property.set(15).map_err(|e| {
1189    ///     eprintln!("Failed to set value: {}", e);
1190    ///     e
1191    /// })?; // Triggers observer (5 -> 15)
1192    /// # Ok::<(), observable_property::PropertyError>(())
1193    /// ```
1194    pub fn subscribe_filtered<F>(
1195        &self,
1196        observer: Observer<T>,
1197        filter: F,
1198    ) -> Result<ObserverId, PropertyError>
1199    where
1200        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1201    {
1202        let filter = Arc::new(filter);
1203        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1204            if filter(old_val, new_val) {
1205                observer(old_val, new_val);
1206            }
1207        });
1208
1209        self.subscribe(filtered_observer)
1210    }
1211
1212    /// Notifies all observers with a batch of changes
1213    ///
1214    /// This method allows you to trigger observer notifications for multiple
1215    /// value changes efficiently. Unlike individual `set()` calls, this method
1216    /// acquires the observer list once and then notifies all observers with each
1217    /// change in the batch.
1218    ///
1219    /// # Performance Characteristics
1220    ///
1221    /// - **Lock optimization**: Acquires read lock only to snapshot observers, then releases it
1222    /// - **Non-blocking**: Other operations can proceed during observer notifications
1223    /// - **Panic isolation**: Individual observer panics don't affect other observers
1224    ///
1225    /// # Arguments
1226    ///
1227    /// * `changes` - A vector of tuples `(old_value, new_value)` to notify observers about
1228    ///
1229    /// # Returns
1230    ///
1231    /// `Ok(())` if successful. Observer errors are logged but don't cause the method to fail.
1232    ///
1233    /// # Examples
1234    ///
1235    /// ```rust
1236    /// use observable_property::ObservableProperty;
1237    /// use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
1238    ///
1239    /// # fn main() -> Result<(), observable_property::PropertyError> {
1240    /// let property = ObservableProperty::new(0);
1241    /// let call_count = Arc::new(AtomicUsize::new(0));
1242    /// let count_clone = call_count.clone();
1243    ///
1244    /// property.subscribe(Arc::new(move |old, new| {
1245    ///     count_clone.fetch_add(1, Ordering::SeqCst);
1246    ///     println!("Change: {} -> {}", old, new);
1247    /// }))?;
1248    ///
1249    /// // Notify with multiple changes at once
1250    /// property.notify_observers_batch(vec![
1251    ///     (0, 10),
1252    ///     (10, 20),
1253    ///     (20, 30),
1254    /// ])?;
1255    ///
1256    /// assert_eq!(call_count.load(Ordering::SeqCst), 3);
1257    /// # Ok(())
1258    /// # }
1259    /// ```
1260    ///
1261    /// # Note
1262    ///
1263    /// This method does NOT update the property's actual value - it only triggers
1264    /// observer notifications. Use `set()` if you want to update the value and
1265    /// notify observers.
1266    pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
1267        // Acquire lock, clone observers, then release lock immediately
1268        // This prevents blocking other operations during potentially long notification process
1269        let observers_snapshot = {
1270            let prop = match self.inner.read() {
1271                Ok(guard) => guard,
1272                Err(poisoned) => {
1273                    // Graceful degradation: recover from poisoned read lock
1274                    poisoned.into_inner()
1275                }
1276            };
1277            prop.observers.values().cloned().collect::<Vec<_>>()
1278        }; // Lock released here
1279
1280        // Notify observers without holding the lock
1281        for (old_val, new_val) in changes {
1282            for observer in &observers_snapshot {
1283                // Wrap in panic recovery like other notification methods
1284                if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1285                    observer(&old_val, &new_val);
1286                })) {
1287                    eprintln!("Observer panic in batch notification: {:?}", e);
1288                }
1289            }
1290        }
1291        Ok(())
1292    }
1293
1294    /// Subscribes an observer and returns a RAII guard for automatic cleanup
1295    ///
1296    /// This method is similar to `subscribe()` but returns a `Subscription` object
1297    /// that automatically removes the observer when it goes out of scope. This
1298    /// provides a more convenient and safer alternative to manual subscription
1299    /// management.
1300    ///
1301    /// # Arguments
1302    ///
1303    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1304    ///
1305    /// # Returns
1306    ///
1307    /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
1308    /// or `Err(PropertyError)` if the lock is poisoned.
1309    ///
1310    /// # Examples
1311    ///
1312    /// ## Basic RAII Subscription
1313    ///
1314    /// ```rust
1315    /// use observable_property::ObservableProperty;
1316    /// use std::sync::Arc;
1317    ///
1318    /// # fn main() -> Result<(), observable_property::PropertyError> {
1319    /// let property = ObservableProperty::new(0);
1320    ///
1321    /// {
1322    ///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1323    ///         println!("Value: {} -> {}", old, new);
1324    ///     }))?;
1325    ///
1326    ///     property.set(42)?; // Prints: "Value: 0 -> 42"
1327    ///     property.set(100)?; // Prints: "Value: 42 -> 100"
1328    ///
1329    ///     // Automatic cleanup when _subscription goes out of scope
1330    /// }
1331    ///
1332    /// property.set(200)?; // No output - subscription was cleaned up
1333    /// # Ok(())
1334    /// # }
1335    /// ```
1336    ///
1337    /// ## Comparison with Manual Management
1338    ///
1339    /// ```rust
1340    /// use observable_property::ObservableProperty;
1341    /// use std::sync::Arc;
1342    ///
1343    /// # fn main() -> Result<(), observable_property::PropertyError> {
1344    /// let property = ObservableProperty::new("initial".to_string());
1345    ///
1346    /// // Method 1: Manual subscription management (traditional approach)
1347    /// let observer_id = property.subscribe(Arc::new(|old, new| {
1348    ///     println!("Manual: {} -> {}", old, new);
1349    /// }))?;
1350    ///
1351    /// // Method 2: RAII subscription management (recommended)
1352    /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1353    ///     println!("RAII: {} -> {}", old, new);
1354    /// }))?;
1355    ///
1356    /// // Both observers will be called
1357    /// property.set("changed".to_string())?;
1358    /// // Prints:
1359    /// // "Manual: initial -> changed"
1360    /// // "RAII: initial -> changed"
1361    ///
1362    /// // Manual cleanup required for first observer
1363    /// property.unsubscribe(observer_id)?;
1364    ///
1365    /// // Second observer (_subscription) is automatically cleaned up when
1366    /// // the variable goes out of scope - no manual intervention needed
1367    /// # Ok(())
1368    /// # }
1369    /// ```
1370    ///
1371    /// ## Error Handling with Early Returns
1372    ///
1373    /// ```rust
1374    /// use observable_property::ObservableProperty;
1375    /// use std::sync::Arc;
1376    ///
1377    /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1378    ///     let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1379    ///         println!("Processing: {} -> {}", old, new);
1380    ///     }))?;
1381    ///
1382    ///     property.set(1)?;
1383    ///     
1384    ///     if property.get()? > 0 {
1385    ///         return Ok(()); // Subscription automatically cleaned up on early return
1386    ///     }
1387    ///
1388    ///     property.set(2)?;
1389    ///     Ok(()) // Subscription automatically cleaned up on normal return
1390    /// }
1391    ///
1392    /// # fn main() -> Result<(), observable_property::PropertyError> {
1393    /// let property = ObservableProperty::new(0);
1394    /// process_with_monitoring(&property)?; // Monitoring active only during function call
1395    /// property.set(99)?; // No monitoring output - subscription was cleaned up
1396    /// # Ok(())
1397    /// # }
1398    /// ```
1399    ///
1400    /// ## Multi-threaded Subscription Management
1401    ///
1402    /// ```rust
1403    /// use observable_property::ObservableProperty;
1404    /// use std::sync::Arc;
1405    /// use std::thread;
1406    ///
1407    /// # fn main() -> Result<(), observable_property::PropertyError> {
1408    /// let property = Arc::new(ObservableProperty::new(0));
1409    /// let property_clone = property.clone();
1410    ///
1411    /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1412    ///     let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1413    ///         println!("Thread observer: {} -> {}", old, new);
1414    ///     }))?;
1415    ///
1416    ///     property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1417    ///     
1418    ///     // Subscription automatically cleaned up when thread ends
1419    ///     Ok(())
1420    /// });
1421    ///
1422    /// handle.join().unwrap()?;
1423    /// property.set(100)?; // No output - thread subscription was cleaned up
1424    /// # Ok(())
1425    /// # }
1426    /// ```
1427    ///
1428    /// # Use Cases
1429    ///
1430    /// This method is particularly useful in scenarios such as:
1431    /// - Temporary observers that should be active only during a specific scope
1432    /// - Error-prone code where manual cleanup might be forgotten
1433    /// - Complex control flow where multiple exit points make manual cleanup difficult
1434    /// - Resource-constrained environments where observer leaks are problematic
1435    pub fn subscribe_with_subscription(
1436        &self,
1437        observer: Observer<T>,
1438    ) -> Result<Subscription<T>, PropertyError> {
1439        let id = self.subscribe(observer)?;
1440        Ok(Subscription {
1441            inner: Arc::clone(&self.inner),
1442            id,
1443        })
1444    }
1445
1446    /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1447    ///
1448    /// This method combines the functionality of `subscribe_filtered()` with the automatic
1449    /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1450    /// called when the filter condition is satisfied, and it will be automatically
1451    /// unsubscribed when the returned `Subscription` goes out of scope.
1452    ///
1453    /// # Arguments
1454    ///
1455    /// * `observer` - The observer function to call when the filter passes
1456    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1457    ///
1458    /// # Returns
1459    ///
1460    /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1461    /// or `Err(PropertyError)` if the lock is poisoned.
1462    ///
1463    /// # Examples
1464    ///
1465    /// ## Basic Filtered RAII Subscription
1466    ///
1467    /// ```rust
1468    /// use observable_property::ObservableProperty;
1469    /// use std::sync::Arc;
1470    ///
1471    /// # fn main() -> Result<(), observable_property::PropertyError> {
1472    /// let counter = ObservableProperty::new(0);
1473    ///
1474    /// {
1475    ///     // Monitor only increases with automatic cleanup
1476    ///     let _increase_monitor = counter.subscribe_filtered_with_subscription(
1477    ///         Arc::new(|old, new| {
1478    ///             println!("Counter increased: {} -> {}", old, new);
1479    ///         }),
1480    ///         |old, new| new > old
1481    ///     )?;
1482    ///
1483    ///     counter.set(5)?;  // Prints: "Counter increased: 0 -> 5"
1484    ///     counter.set(3)?;  // No output (decrease)
1485    ///     counter.set(7)?;  // Prints: "Counter increased: 3 -> 7"
1486    ///
1487    ///     // Subscription automatically cleaned up when leaving scope
1488    /// }
1489    ///
1490    /// counter.set(10)?; // No output - subscription was cleaned up
1491    /// # Ok(())
1492    /// # }
1493    /// ```
1494    ///
1495    /// ## Multi-Condition Temperature Monitoring
1496    ///
1497    /// ```rust
1498    /// use observable_property::ObservableProperty;
1499    /// use std::sync::Arc;
1500    ///
1501    /// # fn main() -> Result<(), observable_property::PropertyError> {
1502    /// let temperature = ObservableProperty::new(20.0_f64);
1503    ///
1504    /// {
1505    ///     // Create filtered subscription that only triggers for significant temperature increases
1506    ///     let _heat_warning = temperature.subscribe_filtered_with_subscription(
1507    ///         Arc::new(|old_temp, new_temp| {
1508    ///             println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1509    ///                      old_temp, new_temp);
1510    ///         }),
1511    ///         |old, new| new > old && (new - old) > 5.0  // Only trigger for increases > 5°C
1512    ///     )?;
1513    ///
1514    ///     // Create another filtered subscription for cooling alerts
1515    ///     let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1516    ///         Arc::new(|old_temp, new_temp| {
1517    ///             println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1518    ///                      old_temp, new_temp);
1519    ///         }),
1520    ///         |old, new| new < old && (old - new) > 3.0  // Only trigger for decreases > 3°C
1521    ///     )?;
1522    ///
1523    ///     // Test the filters
1524    ///     temperature.set(22.0)?; // No alerts (increase of only 2°C)
1525    ///     temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1526    ///     temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1527    ///
1528    ///     // Both subscriptions are automatically cleaned up when they go out of scope
1529    /// }
1530    ///
1531    /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1532    /// # Ok(())
1533    /// # }
1534    /// ```
1535    ///
1536    /// ## Conditional Monitoring with Complex Filters
1537    ///
1538    /// ```rust
1539    /// use observable_property::ObservableProperty;
1540    /// use std::sync::Arc;
1541    ///
1542    /// # fn main() -> Result<(), observable_property::PropertyError> {
1543    /// let stock_price = ObservableProperty::new(100.0_f64);
1544    ///
1545    /// {
1546    ///     // Monitor significant price movements (> 5% change)
1547    ///     let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1548    ///         Arc::new(|old_price, new_price| {
1549    ///             let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1550    ///             println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1551    ///                     old_price, new_price, change_percent);
1552    ///         }),
1553    ///         |old, new| {
1554    ///             let change_percent = ((new - old) / old * 100.0).abs();
1555    ///             change_percent > 5.0  // Trigger on > 5% change
1556    ///         }
1557    ///     )?;
1558    ///
1559    ///     stock_price.set(103.0)?; // No alert (3% change)
1560    ///     stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1561    ///     stock_price.set(95.0)?;  // Alert triggered (12% decrease)
1562    ///
1563    ///     // Subscription automatically cleaned up when leaving scope
1564    /// }
1565    ///
1566    /// stock_price.set(200.0)?; // No alert - monitoring ended
1567    /// # Ok(())
1568    /// # }
1569    /// ```
1570    ///
1571    /// ## Cross-Thread Filtered Monitoring
1572    ///
1573    /// ```rust
1574    /// use observable_property::ObservableProperty;
1575    /// use std::sync::Arc;
1576    /// use std::thread;
1577    /// use std::time::Duration;
1578    ///
1579    /// # fn main() -> Result<(), observable_property::PropertyError> {
1580    /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1581    /// let latency_clone = network_latency.clone();
1582    ///
1583    /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1584    ///     // Monitor high latency in background thread with automatic cleanup
1585    ///     let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1586    ///         Arc::new(|old_ms, new_ms| {
1587    ///             println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1588    ///         }),
1589    ///         |_, new| *new > 100  // Alert when latency exceeds 100ms
1590    ///     )?;
1591    ///
1592    ///     // Simulate monitoring for a short time
1593    ///     thread::sleep(Duration::from_millis(10));
1594    ///     
1595    ///     // Subscription automatically cleaned up when thread ends
1596    ///     Ok(())
1597    /// });
1598    ///
1599    /// // Simulate network conditions
1600    /// network_latency.set(80)?;  // No alert (under threshold)
1601    /// network_latency.set(150)?; // Alert triggered in background thread
1602    ///
1603    /// monitor_handle.join().unwrap()?;
1604    /// network_latency.set(200)?; // No alert - background monitoring ended
1605    /// # Ok(())
1606    /// # }
1607    /// ```
1608    ///
1609    /// # Use Cases
1610    ///
1611    /// This method is ideal for:
1612    /// - Threshold-based monitoring with automatic cleanup
1613    /// - Temporary conditional observers in specific code blocks
1614    /// - Event-driven systems where observers should be active only during certain phases
1615    /// - Resource management scenarios where filtered observers have limited lifetimes
1616    ///
1617    /// # Performance Notes
1618    ///
1619    /// The filter function is evaluated for every property change, so it should be
1620    /// lightweight. Complex filtering logic should be optimized to avoid performance
1621    /// bottlenecks, especially in high-frequency update scenarios.
1622    pub fn subscribe_filtered_with_subscription<F>(
1623        &self,
1624        observer: Observer<T>,
1625        filter: F,
1626    ) -> Result<Subscription<T>, PropertyError>
1627    where
1628        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1629    {
1630        let id = self.subscribe_filtered(observer, filter)?;
1631        Ok(Subscription {
1632            inner: Arc::clone(&self.inner),
1633            id,
1634        })
1635    }
1636
1637    /// Creates a new observable property with full configuration control
1638    ///
1639    /// This constructor provides complete control over the property's configuration,
1640    /// allowing you to customize both thread pool size and maximum observer count.
1641    ///
1642    /// # Arguments
1643    ///
1644    /// * `initial_value` - The starting value for this property
1645    /// * `max_threads` - Maximum threads for async notifications (0 = use default)
1646    /// * `max_observers` - Maximum number of allowed observers (0 = use default)
1647    ///
1648    /// # Examples
1649    ///
1650    /// ```rust
1651    /// use observable_property::ObservableProperty;
1652    ///
1653    /// // Create a property optimized for high-frequency updates with many observers
1654    /// let property = ObservableProperty::with_config(0, 8, 50000);
1655    /// assert_eq!(property.get().unwrap(), 0);
1656    /// ```
1657    pub fn with_config(initial_value: T, max_threads: usize, max_observers: usize) -> Self {
1658        Self {
1659            inner: Arc::new(RwLock::new(InnerProperty {
1660                value: initial_value,
1661                observers: HashMap::new(),
1662                next_id: 0,
1663            })),
1664            max_threads: if max_threads == 0 { MAX_THREADS } else { max_threads },
1665            max_observers: if max_observers == 0 { MAX_OBSERVERS } else { max_observers },
1666        }
1667    }
1668
1669    /// Returns the current number of active observers
1670    ///
1671    /// This method is useful for debugging, monitoring, and testing to verify
1672    /// that observers are being properly managed and cleaned up.
1673    ///
1674    /// # Returns
1675    ///
1676    /// The number of currently subscribed observers, or 0 if the lock is poisoned.
1677    ///
1678    /// # Examples
1679    ///
1680    /// ```rust
1681    /// use observable_property::ObservableProperty;
1682    /// use std::sync::Arc;
1683    ///
1684    /// # fn main() -> Result<(), observable_property::PropertyError> {
1685    /// let property = ObservableProperty::new(42);
1686    /// assert_eq!(property.observer_count(), 0);
1687    ///
1688    /// let id1 = property.subscribe(Arc::new(|_, _| {}))?;
1689    /// assert_eq!(property.observer_count(), 1);
1690    ///
1691    /// let id2 = property.subscribe(Arc::new(|_, _| {}))?;
1692    /// assert_eq!(property.observer_count(), 2);
1693    ///
1694    /// property.unsubscribe(id1)?;
1695    /// assert_eq!(property.observer_count(), 1);
1696    /// # Ok(())
1697    /// # }
1698    /// ```
1699    pub fn observer_count(&self) -> usize {
1700        match self.inner.read() {
1701            Ok(prop) => prop.observers.len(),
1702            Err(poisoned) => {
1703                // Graceful degradation: recover from poisoned lock
1704                poisoned.into_inner().observers.len()
1705            }
1706        }
1707    }
1708
1709    /// Gets the current value without Result wrapping
1710    ///
1711    /// This is a convenience method that returns `None` if the lock is poisoned
1712    /// (which shouldn't happen with graceful degradation) instead of a Result.
1713    ///
1714    /// # Returns
1715    ///
1716    /// `Some(T)` containing the current value, or `None` if somehow inaccessible.
1717    ///
1718    /// # Examples
1719    ///
1720    /// ```rust
1721    /// use observable_property::ObservableProperty;
1722    ///
1723    /// let property = ObservableProperty::new(42);
1724    /// assert_eq!(property.try_get(), Some(42));
1725    /// ```
1726    pub fn try_get(&self) -> Option<T> {
1727        self.get().ok()
1728    }
1729
1730    /// Atomically modifies the property value using a closure
1731    ///
1732    /// This method allows you to update the property based on its current value
1733    /// in a single atomic operation. The closure receives a mutable reference to
1734    /// the value and can modify it in place.
1735    ///
1736    /// # Arguments
1737    ///
1738    /// * `f` - A closure that receives `&mut T` and modifies it
1739    ///
1740    /// # Returns
1741    ///
1742    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
1743    ///
1744    /// # Examples
1745    ///
1746    /// ```rust
1747    /// use observable_property::ObservableProperty;
1748    /// use std::sync::Arc;
1749    ///
1750    /// # fn main() -> Result<(), observable_property::PropertyError> {
1751    /// let counter = ObservableProperty::new(0);
1752    ///
1753    /// counter.subscribe(Arc::new(|old, new| {
1754    ///     println!("Counter: {} -> {}", old, new);
1755    /// }))?;
1756    ///
1757    /// // Increment counter atomically
1758    /// counter.modify(|value| *value += 1)?;
1759    /// assert_eq!(counter.get()?, 1);
1760    ///
1761    /// // Double the counter atomically
1762    /// counter.modify(|value| *value *= 2)?;
1763    /// assert_eq!(counter.get()?, 2);
1764    /// # Ok(())
1765    /// # }
1766    /// ```
1767    pub fn modify<F>(&self, f: F) -> Result<(), PropertyError>
1768    where
1769        F: FnOnce(&mut T),
1770    {
1771        let (old_value, new_value, observers_snapshot) = {
1772            let mut prop = match self.inner.write() {
1773                Ok(guard) => guard,
1774                Err(poisoned) => {
1775                    // Graceful degradation: recover from poisoned write lock
1776                    poisoned.into_inner()
1777                }
1778            };
1779
1780            let old_value = prop.value.clone();
1781            f(&mut prop.value);
1782            let new_value = prop.value.clone();
1783            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
1784            (old_value, new_value, observers_snapshot)
1785        };
1786
1787        // Notify observers with old and new values
1788        for observer in observers_snapshot {
1789            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1790                observer(&old_value, &new_value);
1791            })) {
1792                eprintln!("Observer panic in modify: {:?}", e);
1793            }
1794        }
1795
1796        Ok(())
1797    }
1798}
1799
1800impl<T: Clone + Default + Send + Sync + 'static> ObservableProperty<T> {
1801    /// Gets the current value or returns the default if inaccessible
1802    ///
1803    /// This convenience method is only available when `T` implements `Default`.
1804    /// It provides a fallback to `T::default()` if the value cannot be read.
1805    ///
1806    /// # Examples
1807    ///
1808    /// ```rust
1809    /// use observable_property::ObservableProperty;
1810    ///
1811    /// let property = ObservableProperty::new(42);
1812    /// assert_eq!(property.get_or_default(), 42);
1813    ///
1814    /// // Even if somehow inaccessible, returns default
1815    /// let empty_property: ObservableProperty<i32> = ObservableProperty::new(0);
1816    /// assert_eq!(empty_property.get_or_default(), 0);
1817    /// ```
1818    pub fn get_or_default(&self) -> T {
1819        self.get().unwrap_or_default()
1820    }
1821}
1822
1823impl<T: Clone> Clone for ObservableProperty<T> {
1824    /// Creates a new reference to the same observable property
1825    ///
1826    /// This creates a new `ObservableProperty` instance that shares the same
1827    /// underlying data with the original. Changes made through either instance
1828    /// will be visible to observers subscribed through both instances.
1829    ///
1830    /// # Examples
1831    ///
1832    /// ```rust
1833    /// use observable_property::ObservableProperty;
1834    /// use std::sync::Arc;
1835    ///
1836    /// let property1 = ObservableProperty::new(42);
1837    /// let property2 = property1.clone();
1838    ///
1839    /// property2.subscribe(Arc::new(|old, new| {
1840    ///     println!("Observer on property2 saw change: {} -> {}", old, new);
1841    /// })).map_err(|e| {
1842    ///     eprintln!("Failed to subscribe: {}", e);
1843    ///     e
1844    /// })?;
1845    ///
1846    /// // This change through property1 will trigger the observer on property2
1847    /// property1.set(100).map_err(|e| {
1848    ///     eprintln!("Failed to set value: {}", e);
1849    ///     e
1850    /// })?;
1851    /// # Ok::<(), observable_property::PropertyError>(())
1852    /// ```
1853    fn clone(&self) -> Self {
1854        Self {
1855            inner: Arc::clone(&self.inner),
1856            max_threads: self.max_threads,
1857            max_observers: self.max_observers,
1858        }
1859    }
1860}
1861
1862impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1863    /// Debug implementation that shows the current value if accessible
1864    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1865        match self.get() {
1866            Ok(value) => f
1867                .debug_struct("ObservableProperty")
1868                .field("value", &value)
1869                .field("observers_count", &"[hidden]")
1870                .field("max_threads", &self.max_threads)
1871                .field("max_observers", &self.max_observers)
1872                .finish(),
1873            Err(_) => f
1874                .debug_struct("ObservableProperty")
1875                .field("value", &"[inaccessible]")
1876                .field("observers_count", &"[hidden]")
1877                .field("max_threads", &self.max_threads)
1878                .field("max_observers", &self.max_observers)
1879                .finish(),
1880        }
1881    }
1882}
1883
1884#[cfg(test)]
1885mod tests {
1886    use super::*;
1887    use std::sync::atomic::{AtomicUsize, Ordering};
1888    use std::time::Duration;
1889
1890    #[test]
1891    fn test_property_creation_and_basic_operations() {
1892        let prop = ObservableProperty::new(42);
1893
1894        // Test initial value
1895        match prop.get() {
1896            Ok(value) => assert_eq!(value, 42),
1897            Err(e) => panic!("Failed to get initial value: {}", e),
1898        }
1899
1900        // Test setting value
1901        if let Err(e) = prop.set(100) {
1902            panic!("Failed to set value: {}", e);
1903        }
1904
1905        match prop.get() {
1906            Ok(value) => assert_eq!(value, 100),
1907            Err(e) => panic!("Failed to get updated value: {}", e),
1908        }
1909    }
1910
1911    #[test]
1912    fn test_observer_subscription_and_notification() {
1913        let prop = ObservableProperty::new("initial".to_string());
1914        let notification_count = Arc::new(AtomicUsize::new(0));
1915        let last_old_value = Arc::new(RwLock::new(String::new()));
1916        let last_new_value = Arc::new(RwLock::new(String::new()));
1917
1918        let count_clone = notification_count.clone();
1919        let old_clone = last_old_value.clone();
1920        let new_clone = last_new_value.clone();
1921
1922        let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1923            count_clone.fetch_add(1, Ordering::SeqCst);
1924            if let Ok(mut old_val) = old_clone.write() {
1925                *old_val = old.clone();
1926            }
1927            if let Ok(mut new_val) = new_clone.write() {
1928                *new_val = new.clone();
1929            }
1930        })) {
1931            Ok(id) => id,
1932            Err(e) => panic!("Failed to subscribe observer: {}", e),
1933        };
1934
1935        // Change value and verify notification
1936        if let Err(e) = prop.set("changed".to_string()) {
1937            panic!("Failed to set property value: {}", e);
1938        }
1939
1940        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1941
1942        match last_old_value.read() {
1943            Ok(old_val) => assert_eq!(*old_val, "initial"),
1944            Err(e) => panic!("Failed to read old value: {:?}", e),
1945        }
1946
1947        match last_new_value.read() {
1948            Ok(new_val) => assert_eq!(*new_val, "changed"),
1949            Err(e) => panic!("Failed to read new value: {:?}", e),
1950        }
1951
1952        // Test unsubscription
1953        match prop.unsubscribe(observer_id) {
1954            Ok(was_present) => assert!(was_present),
1955            Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1956        }
1957
1958        // Change value again - should not notify
1959        if let Err(e) = prop.set("not_notified".to_string()) {
1960            panic!("Failed to set property value after unsubscribe: {}", e);
1961        }
1962        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1963    }
1964
1965    #[test]
1966    fn test_filtered_observer() {
1967        let prop = ObservableProperty::new(0i32);
1968        let notification_count = Arc::new(AtomicUsize::new(0));
1969        let count_clone = notification_count.clone();
1970
1971        // Observer only triggered when value increases
1972        let observer_id = match prop.subscribe_filtered(
1973            Arc::new(move |_, _| {
1974                count_clone.fetch_add(1, Ordering::SeqCst);
1975            }),
1976            |old, new| new > old,
1977        ) {
1978            Ok(id) => id,
1979            Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1980        };
1981
1982        // Should trigger (0 -> 5)
1983        if let Err(e) = prop.set(5) {
1984            panic!("Failed to set property value to 5: {}", e);
1985        }
1986        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1987
1988        // Should NOT trigger (5 -> 3)
1989        if let Err(e) = prop.set(3) {
1990            panic!("Failed to set property value to 3: {}", e);
1991        }
1992        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1993
1994        // Should trigger (3 -> 10)
1995        if let Err(e) = prop.set(10) {
1996            panic!("Failed to set property value to 10: {}", e);
1997        }
1998        assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1999
2000        match prop.unsubscribe(observer_id) {
2001            Ok(_) => {}
2002            Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
2003        }
2004    }
2005
2006    #[test]
2007    fn test_thread_safety_concurrent_reads() {
2008        let prop = Arc::new(ObservableProperty::new(42i32));
2009        let num_threads = 10;
2010        let reads_per_thread = 100;
2011
2012        let handles: Vec<_> = (0..num_threads)
2013            .map(|_| {
2014                let prop_clone = prop.clone();
2015                thread::spawn(move || {
2016                    for _ in 0..reads_per_thread {
2017                        match prop_clone.get() {
2018                            Ok(value) => assert_eq!(value, 42),
2019                            Err(e) => panic!("Failed to read property value: {}", e),
2020                        }
2021                        thread::sleep(Duration::from_millis(1));
2022                    }
2023                })
2024            })
2025            .collect();
2026
2027        for handle in handles {
2028            if let Err(e) = handle.join() {
2029                panic!("Thread failed to complete: {:?}", e);
2030            }
2031        }
2032    }
2033
2034    #[test]
2035    fn test_async_set_performance() {
2036        let prop = ObservableProperty::new(0i32);
2037        let slow_observer_count = Arc::new(AtomicUsize::new(0));
2038        let count_clone = slow_observer_count.clone();
2039
2040        // Add observer that simulates slow work
2041        let _id = match prop.subscribe(Arc::new(move |_, _| {
2042            thread::sleep(Duration::from_millis(50));
2043            count_clone.fetch_add(1, Ordering::SeqCst);
2044        })) {
2045            Ok(id) => id,
2046            Err(e) => panic!("Failed to subscribe slow observer: {}", e),
2047        };
2048
2049        // Test synchronous set (should be slow)
2050        let start = std::time::Instant::now();
2051        if let Err(e) = prop.set(1) {
2052            panic!("Failed to set property value synchronously: {}", e);
2053        }
2054        let sync_duration = start.elapsed();
2055
2056        // Test asynchronous set (should be fast)
2057        let start = std::time::Instant::now();
2058        if let Err(e) = prop.set_async(2) {
2059            panic!("Failed to set property value asynchronously: {}", e);
2060        }
2061        let async_duration = start.elapsed();
2062
2063        // Async should be much faster than sync
2064        assert!(async_duration < sync_duration);
2065        assert!(async_duration.as_millis() < 10); // Should be very fast
2066
2067        // Wait for async observer to complete
2068        thread::sleep(Duration::from_millis(100));
2069
2070        // Both observers should have been called
2071        assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
2072    }
2073
2074    #[test]
2075    fn test_lock_poisoning() {
2076        // Create a property that we'll poison
2077        let prop = Arc::new(ObservableProperty::new(0));
2078        let prop_clone = prop.clone();
2079
2080        // Create a thread that will deliberately poison the lock
2081        let poison_thread = thread::spawn(move || {
2082            // Get write lock and then panic, which will poison the lock
2083            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2084            panic!("Deliberate panic to poison the lock");
2085        });
2086
2087        // Wait for the thread to complete (it will panic)
2088        let _ = poison_thread.join();
2089
2090        // With graceful degradation, operations should succeed even with poisoned locks
2091        // The implementation recovers the inner value using into_inner()
2092        
2093        // get() should succeed by recovering from poisoned lock
2094        match prop.get() {
2095            Ok(value) => assert_eq!(value, 0), // Should recover the value
2096            Err(e) => panic!("get() should succeed with graceful degradation, got error: {:?}", e),
2097        }
2098
2099        // set() should succeed by recovering from poisoned lock
2100        match prop.set(42) {
2101            Ok(_) => {}, // Expected success with graceful degradation
2102            Err(e) => panic!("set() should succeed with graceful degradation, got error: {:?}", e),
2103        }
2104        
2105        // Verify the value was actually set
2106        assert_eq!(prop.get().expect("Failed to get value after set"), 42);
2107
2108        // subscribe() should succeed by recovering from poisoned lock
2109        match prop.subscribe(Arc::new(|_, _| {})) {
2110            Ok(_) => {}, // Expected success with graceful degradation
2111            Err(e) => panic!("subscribe() should succeed with graceful degradation, got error: {:?}", e),
2112        }
2113    }
2114
2115    #[test]
2116    fn test_observer_panic_isolation() {
2117        let prop = ObservableProperty::new(0);
2118        let call_counts = Arc::new(AtomicUsize::new(0));
2119
2120        // First observer will panic
2121        let panic_observer_id = prop
2122            .subscribe(Arc::new(|_, _| {
2123                panic!("This observer deliberately panics");
2124            }))
2125            .expect("Failed to subscribe panic observer");
2126
2127        // Second observer should still be called despite first one panicking
2128        let counts = call_counts.clone();
2129        let normal_observer_id = prop
2130            .subscribe(Arc::new(move |_, _| {
2131                counts.fetch_add(1, Ordering::SeqCst);
2132            }))
2133            .expect("Failed to subscribe normal observer");
2134
2135        // Trigger the observers - this shouldn't panic despite the first observer panicking
2136        prop.set(42).expect("Failed to set property value");
2137
2138        // Verify the second observer was still called
2139        assert_eq!(call_counts.load(Ordering::SeqCst), 1);
2140
2141        // Clean up
2142        prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
2143        prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
2144    }
2145
2146    #[test]
2147    fn test_unsubscribe_nonexistent_observer() {
2148        let property = ObservableProperty::new(0);
2149
2150        // Generate a valid observer ID
2151        let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
2152
2153        // Create an ID that doesn't exist (valid_id + 1000 should not exist)
2154        let nonexistent_id = valid_id + 1000;
2155
2156        // Test unsubscribing a nonexistent observer
2157        match property.unsubscribe(nonexistent_id) {
2158            Ok(was_present) => {
2159                assert!(
2160                    !was_present,
2161                    "Unsubscribe should return false for nonexistent ID"
2162                );
2163            }
2164            Err(e) => panic!("Unsubscribe returned error: {:?}", e),
2165        }
2166
2167        // Also verify that unsubscribing twice returns false the second time
2168        property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
2169
2170        let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
2171        assert!(!result, "Second unsubscribe should return false");
2172    }
2173
2174    #[test]
2175    fn test_observer_id_wraparound() {
2176        let prop = ObservableProperty::new(0);
2177
2178        // Test that observer IDs increment properly and don't wrap around unexpectedly
2179        let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
2180        let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
2181        let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
2182
2183        assert!(id2 > id1, "Observer IDs should increment");
2184        assert!(id3 > id2, "Observer IDs should continue incrementing");
2185        assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
2186        assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
2187
2188        // Clean up
2189        prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
2190        prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
2191        prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
2192    }
2193
2194    #[test]
2195    fn test_concurrent_subscribe_unsubscribe() {
2196        let prop = Arc::new(ObservableProperty::new(0));
2197        let num_threads = 8;
2198        let operations_per_thread = 100;
2199
2200        let handles: Vec<_> = (0..num_threads)
2201            .map(|thread_id| {
2202                let prop_clone = prop.clone();
2203                thread::spawn(move || {
2204                    let mut local_ids = Vec::new();
2205
2206                    for i in 0..operations_per_thread {
2207                        // Subscribe an observer
2208                        let observer_id = prop_clone
2209                            .subscribe(Arc::new(move |old, new| {
2210                                // Do some work to simulate real observer
2211                                let _ = thread_id + i + old + new;
2212                            }))
2213                            .expect("Subscribe should succeed");
2214
2215                        local_ids.push(observer_id);
2216
2217                        // Occasionally unsubscribe some observers
2218                        if i % 10 == 0 && !local_ids.is_empty() {
2219                            let idx = i % local_ids.len();
2220                            let id_to_remove = local_ids.remove(idx);
2221                            prop_clone
2222                                .unsubscribe(id_to_remove)
2223                                .expect("Unsubscribe should succeed");
2224                        }
2225                    }
2226
2227                    // Clean up remaining observers
2228                    for id in local_ids {
2229                        prop_clone
2230                            .unsubscribe(id)
2231                            .expect("Final cleanup should succeed");
2232                    }
2233                })
2234            })
2235            .collect();
2236
2237        // Wait for all threads to complete
2238        for handle in handles {
2239            handle.join().expect("Thread should complete successfully");
2240        }
2241
2242        // Property should still be functional
2243        prop.set(42)
2244            .expect("Property should still work after concurrent operations");
2245    }
2246
2247    #[test]
2248    fn test_multiple_observer_panics_isolation() {
2249        let prop = ObservableProperty::new(0);
2250        let successful_calls = Arc::new(AtomicUsize::new(0));
2251
2252        // Create multiple observers that will panic
2253        let _panic_id1 = prop
2254            .subscribe(Arc::new(|_, _| {
2255                panic!("First panic observer");
2256            }))
2257            .expect("Failed to subscribe first panic observer");
2258
2259        let _panic_id2 = prop
2260            .subscribe(Arc::new(|_, _| {
2261                panic!("Second panic observer");
2262            }))
2263            .expect("Failed to subscribe second panic observer");
2264
2265        // Create observers that should succeed despite the panics
2266        let count1 = successful_calls.clone();
2267        let _success_id1 = prop
2268            .subscribe(Arc::new(move |_, _| {
2269                count1.fetch_add(1, Ordering::SeqCst);
2270            }))
2271            .expect("Failed to subscribe first success observer");
2272
2273        let count2 = successful_calls.clone();
2274        let _success_id2 = prop
2275            .subscribe(Arc::new(move |_, _| {
2276                count2.fetch_add(1, Ordering::SeqCst);
2277            }))
2278            .expect("Failed to subscribe second success observer");
2279
2280        // Trigger all observers
2281        prop.set(42).expect("Failed to set property value for panic isolation test");
2282
2283        // Both successful observers should have been called despite the panics
2284        assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
2285    }
2286
2287    #[test]
2288    fn test_async_observer_panic_isolation() {
2289        let prop = ObservableProperty::new(0);
2290        let successful_calls = Arc::new(AtomicUsize::new(0));
2291
2292        // Create observer that will panic
2293        let _panic_id = prop
2294            .subscribe(Arc::new(|_, _| {
2295                panic!("Async panic observer");
2296            }))
2297            .expect("Failed to subscribe async panic observer");
2298
2299        // Create observer that should succeed
2300        let count = successful_calls.clone();
2301        let _success_id = prop
2302            .subscribe(Arc::new(move |_, _| {
2303                count.fetch_add(1, Ordering::SeqCst);
2304            }))
2305            .expect("Failed to subscribe async success observer");
2306
2307        // Use async set to trigger observers in background threads
2308        prop.set_async(42).expect("Failed to set property value asynchronously");
2309
2310        // Wait for async observers to complete
2311        thread::sleep(Duration::from_millis(100));
2312
2313        // The successful observer should have been called despite the panic
2314        assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
2315    }
2316
2317    #[test]
2318    fn test_very_large_observer_count() {
2319        let prop = ObservableProperty::new(0);
2320        let total_calls = Arc::new(AtomicUsize::new(0));
2321        let observer_count = 1000;
2322
2323        // Subscribe many observers
2324        let mut observer_ids = Vec::with_capacity(observer_count);
2325        for i in 0..observer_count {
2326            let count = total_calls.clone();
2327            let id = prop
2328                .subscribe(Arc::new(move |old, new| {
2329                    count.fetch_add(1, Ordering::SeqCst);
2330                    // Verify we got the right values
2331                    assert_eq!(*old, 0);
2332                    assert_eq!(*new, i + 1);
2333                }))
2334                .expect("Failed to subscribe large observer count test observer");
2335            observer_ids.push(id);
2336        }
2337
2338        // Trigger all observers
2339        prop.set(observer_count).expect("Failed to set property value for large observer count test");
2340
2341        // All observers should have been called
2342        assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
2343
2344        // Clean up
2345        for id in observer_ids {
2346            prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
2347        }
2348    }
2349
2350    #[test]
2351    fn test_observer_with_mutable_state() {
2352        let prop = ObservableProperty::new(0);
2353        let call_history = Arc::new(RwLock::new(Vec::new()));
2354
2355        let history = call_history.clone();
2356        let observer_id = prop
2357            .subscribe(Arc::new(move |old, new| {
2358                if let Ok(mut hist) = history.write() {
2359                    hist.push((*old, *new));
2360                }
2361            }))
2362            .expect("Failed to subscribe mutable state observer");
2363
2364        // Make several changes
2365        prop.set(1).expect("Failed to set property to 1");
2366        prop.set(2).expect("Failed to set property to 2");
2367        prop.set(3).expect("Failed to set property to 3");
2368
2369        // Verify the history was recorded correctly
2370        let history = call_history.read().expect("Failed to read call history");
2371        assert_eq!(history.len(), 3);
2372        assert_eq!(history[0], (0, 1));
2373        assert_eq!(history[1], (1, 2));
2374        assert_eq!(history[2], (2, 3));
2375
2376        prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
2377    }
2378
2379    #[test]
2380    fn test_subscription_automatic_cleanup() {
2381        let prop = ObservableProperty::new(0);
2382        let call_count = Arc::new(AtomicUsize::new(0));
2383
2384        // Test that subscription automatically cleans up when dropped
2385        {
2386            let count = call_count.clone();
2387            let _subscription = prop
2388                .subscribe_with_subscription(Arc::new(move |_, _| {
2389                    count.fetch_add(1, Ordering::SeqCst);
2390                }))
2391                .expect("Failed to create subscription for automatic cleanup test");
2392
2393            // Observer should be active while subscription is in scope
2394            prop.set(1).expect("Failed to set property value in subscription test");
2395            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2396
2397            // Subscription goes out of scope here and should auto-cleanup
2398        }
2399
2400        // Observer should no longer be active after subscription dropped
2401        prop.set(2).expect("Failed to set property value after subscription dropped");
2402        assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
2403    }
2404
2405    #[test]
2406    fn test_subscription_explicit_drop() {
2407        let prop = ObservableProperty::new(0);
2408        let call_count = Arc::new(AtomicUsize::new(0));
2409
2410        let count = call_count.clone();
2411        let subscription = prop
2412            .subscribe_with_subscription(Arc::new(move |_, _| {
2413                count.fetch_add(1, Ordering::SeqCst);
2414            }))
2415            .expect("Failed to create subscription for explicit drop test");
2416
2417        // Observer should be active
2418        prop.set(1).expect("Failed to set property value before explicit drop");
2419        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2420
2421        // Explicitly drop the subscription
2422        drop(subscription);
2423
2424        // Observer should no longer be active
2425        prop.set(2).expect("Failed to set property value after explicit drop");
2426        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2427    }
2428
2429    #[test]
2430    fn test_multiple_subscriptions_with_cleanup() {
2431        let prop = ObservableProperty::new(0);
2432        let call_count1 = Arc::new(AtomicUsize::new(0));
2433        let call_count2 = Arc::new(AtomicUsize::new(0));
2434        let call_count3 = Arc::new(AtomicUsize::new(0));
2435
2436        let count1 = call_count1.clone();
2437        let count2 = call_count2.clone();
2438        let count3 = call_count3.clone();
2439
2440        let subscription1 = prop
2441            .subscribe_with_subscription(Arc::new(move |_, _| {
2442                count1.fetch_add(1, Ordering::SeqCst);
2443            }))
2444            .expect("Failed to create first subscription");
2445
2446        let subscription2 = prop
2447            .subscribe_with_subscription(Arc::new(move |_, _| {
2448                count2.fetch_add(1, Ordering::SeqCst);
2449            }))
2450            .expect("Failed to create second subscription");
2451
2452        let subscription3 = prop
2453            .subscribe_with_subscription(Arc::new(move |_, _| {
2454                count3.fetch_add(1, Ordering::SeqCst);
2455            }))
2456            .expect("Failed to create third subscription");
2457
2458        // All observers should be active
2459        prop.set(1).expect("Failed to set property value with all subscriptions");
2460        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
2461        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2462        assert_eq!(call_count3.load(Ordering::SeqCst), 1);
2463
2464        // Drop second subscription
2465        drop(subscription2);
2466
2467        // Only first and third should be active
2468        prop.set(2).expect("Failed to set property value with partial subscriptions");
2469        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2470        assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
2471        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2472
2473        // Drop remaining subscriptions
2474        drop(subscription1);
2475        drop(subscription3);
2476
2477        // No observers should be active
2478        prop.set(3).expect("Failed to set property value with no subscriptions");
2479        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2480        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2481        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2482    }
2483
2484    #[test]
2485    fn test_subscription_drop_with_poisoned_lock() {
2486        let prop = Arc::new(ObservableProperty::new(0));
2487        let prop_clone = prop.clone();
2488
2489        // Create a subscription
2490        let call_count = Arc::new(AtomicUsize::new(0));
2491        let count = call_count.clone();
2492        let subscription = prop
2493            .subscribe_with_subscription(Arc::new(move |_, _| {
2494                count.fetch_add(1, Ordering::SeqCst);
2495            }))
2496            .expect("Failed to create subscription for poisoned lock test");
2497
2498        // Poison the lock by panicking while holding a write lock
2499        let poison_thread = thread::spawn(move || {
2500            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2501            panic!("Deliberate panic to poison the lock");
2502        });
2503        let _ = poison_thread.join(); // Ignore the panic result
2504
2505        // Dropping the subscription should not panic even with poisoned lock
2506        // This tests that the Drop implementation handles poisoned locks gracefully
2507        drop(subscription); // Should complete without panic
2508
2509        // Test passes if we reach here without panicking
2510    }
2511
2512    #[test]
2513    fn test_subscription_vs_manual_unsubscribe() {
2514        let prop = ObservableProperty::new(0);
2515        let auto_count = Arc::new(AtomicUsize::new(0));
2516        let manual_count = Arc::new(AtomicUsize::new(0));
2517
2518        // Manual subscription
2519        let manual_count_clone = manual_count.clone();
2520        let manual_id = prop
2521            .subscribe(Arc::new(move |_, _| {
2522                manual_count_clone.fetch_add(1, Ordering::SeqCst);
2523            }))
2524            .expect("Failed to create manual subscription");
2525
2526        // Automatic subscription
2527        let auto_count_clone = auto_count.clone();
2528        let _auto_subscription = prop
2529            .subscribe_with_subscription(Arc::new(move |_, _| {
2530                auto_count_clone.fetch_add(1, Ordering::SeqCst);
2531            }))
2532            .expect("Failed to create automatic subscription");
2533
2534        // Both should be active
2535        prop.set(1).expect("Failed to set property value with both subscriptions");
2536        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2537        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2538
2539        // Manual unsubscribe
2540        prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2541
2542        // Only automatic subscription should be active
2543        prop.set(2).expect("Failed to set property value after manual unsubscribe");
2544        assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2545        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2546
2547        // Auto subscription goes out of scope here and cleans up automatically
2548    }
2549
2550    #[test]
2551    fn test_subscribe_with_subscription_error_handling() {
2552        let prop = Arc::new(ObservableProperty::new(0));
2553        let prop_clone = prop.clone();
2554
2555        // Poison the lock
2556        let poison_thread = thread::spawn(move || {
2557            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2558            panic!("Deliberate panic to poison the lock");
2559        });
2560        let _ = poison_thread.join();
2561
2562        // With graceful degradation, subscribe_with_subscription should succeed
2563        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2564        assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
2565    }
2566
2567    #[test]
2568    fn test_subscription_with_property_cloning() {
2569        let prop1 = ObservableProperty::new(0);
2570        let prop2 = prop1.clone();
2571        let call_count = Arc::new(AtomicUsize::new(0));
2572
2573        // Subscribe to prop1
2574        let count = call_count.clone();
2575        let _subscription = prop1
2576            .subscribe_with_subscription(Arc::new(move |_, _| {
2577                count.fetch_add(1, Ordering::SeqCst);
2578            }))
2579            .expect("Failed to create subscription for cloned property test");
2580
2581        // Changes through prop2 should trigger the observer subscribed to prop1
2582        prop2.set(42).expect("Failed to set property value through prop2");
2583        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2584
2585        // Changes through prop1 should also trigger the observer
2586        prop1.set(100).expect("Failed to set property value through prop1");
2587        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2588    }
2589
2590    #[test]
2591    fn test_subscription_in_conditional_blocks() {
2592        let prop = ObservableProperty::new(0);
2593        let call_count = Arc::new(AtomicUsize::new(0));
2594
2595        let should_subscribe = true;
2596
2597        if should_subscribe {
2598            let count = call_count.clone();
2599            let _subscription = prop
2600                .subscribe_with_subscription(Arc::new(move |_, _| {
2601                    count.fetch_add(1, Ordering::SeqCst);
2602                }))
2603                .expect("Failed to create subscription in conditional block");
2604
2605            // Observer active within this block
2606            prop.set(1).expect("Failed to set property value in conditional block");
2607            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2608
2609            // Subscription dropped when exiting this block
2610        }
2611
2612        // Observer should be inactive now
2613        prop.set(2).expect("Failed to set property value after conditional block");
2614        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2615    }
2616
2617    #[test]
2618    fn test_subscription_with_early_return() {
2619        fn test_function(
2620            prop: &ObservableProperty<i32>,
2621            should_return_early: bool,
2622        ) -> Result<(), PropertyError> {
2623            let call_count = Arc::new(AtomicUsize::new(0));
2624            let count = call_count.clone();
2625
2626            let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2627                count.fetch_add(1, Ordering::SeqCst);
2628            }))?;
2629
2630            prop.set(1)?;
2631            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2632
2633            if should_return_early {
2634                return Ok(()); // Subscription should be cleaned up here
2635            }
2636
2637            prop.set(2)?;
2638            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2639
2640            Ok(())
2641            // Subscription cleaned up when function exits normally
2642        }
2643
2644        let prop = ObservableProperty::new(0);
2645
2646        // Test early return
2647        test_function(&prop, true).expect("Failed to test early return");
2648
2649        // Verify observer is no longer active after early return
2650        prop.set(10).expect("Failed to set property value after early return");
2651
2652        // Test normal exit
2653        test_function(&prop, false).expect("Failed to test normal exit");
2654
2655        // Verify observer is no longer active after normal exit
2656        prop.set(20).expect("Failed to set property value after normal exit");
2657    }
2658
2659    #[test]
2660    fn test_subscription_move_semantics() {
2661        let prop = ObservableProperty::new(0);
2662        let call_count = Arc::new(AtomicUsize::new(0));
2663
2664        let count = call_count.clone();
2665        let subscription = prop
2666            .subscribe_with_subscription(Arc::new(move |_, _| {
2667                count.fetch_add(1, Ordering::SeqCst);
2668            }))
2669            .expect("Failed to create subscription for move semantics test");
2670
2671        // Observer should be active
2672        prop.set(1).expect("Failed to set property value before move");
2673        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2674
2675        // Move subscription to a new variable
2676        let moved_subscription = subscription;
2677
2678        // Observer should still be active after move
2679        prop.set(2).expect("Failed to set property value after move");
2680        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2681
2682        // Drop the moved subscription
2683        drop(moved_subscription);
2684
2685        // Observer should now be inactive
2686        prop.set(3).expect("Failed to set property value after moved subscription drop");
2687        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2688    }
2689
2690    #[test]
2691    fn test_filtered_subscription_automatic_cleanup() {
2692        let prop = ObservableProperty::new(0);
2693        let call_count = Arc::new(AtomicUsize::new(0));
2694
2695        {
2696            let count = call_count.clone();
2697            let _subscription = prop
2698                .subscribe_filtered_with_subscription(
2699                    Arc::new(move |_, _| {
2700                        count.fetch_add(1, Ordering::SeqCst);
2701                    }),
2702                    |old, new| new > old, // Only trigger on increases
2703                )
2704                .expect("Failed to create filtered subscription");
2705
2706            // Should trigger (0 -> 5)
2707            prop.set(5).expect("Failed to set property value to 5 in filtered test");
2708            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2709
2710            // Should NOT trigger (5 -> 3)
2711            prop.set(3).expect("Failed to set property value to 3 in filtered test");
2712            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2713
2714            // Should trigger (3 -> 10)
2715            prop.set(10).expect("Failed to set property value to 10 in filtered test");
2716            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2717
2718            // Subscription goes out of scope here
2719        }
2720
2721        // Observer should be inactive after subscription cleanup
2722        prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2723        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2724    }
2725
2726    #[test]
2727    fn test_multiple_filtered_subscriptions() {
2728        let prop = ObservableProperty::new(10);
2729        let increase_count = Arc::new(AtomicUsize::new(0));
2730        let decrease_count = Arc::new(AtomicUsize::new(0));
2731        let significant_change_count = Arc::new(AtomicUsize::new(0));
2732
2733        let inc_count = increase_count.clone();
2734        let dec_count = decrease_count.clone();
2735        let sig_count = significant_change_count.clone();
2736
2737        let _increase_sub = prop
2738            .subscribe_filtered_with_subscription(
2739                Arc::new(move |_, _| {
2740                    inc_count.fetch_add(1, Ordering::SeqCst);
2741                }),
2742                |old, new| new > old,
2743            )
2744            .expect("Failed to create increase subscription");
2745
2746        let _decrease_sub = prop
2747            .subscribe_filtered_with_subscription(
2748                Arc::new(move |_, _| {
2749                    dec_count.fetch_add(1, Ordering::SeqCst);
2750                }),
2751                |old, new| new < old,
2752            )
2753            .expect("Failed to create decrease subscription");
2754
2755        let _significant_sub = prop
2756            .subscribe_filtered_with_subscription(
2757                Arc::new(move |_, _| {
2758                    sig_count.fetch_add(1, Ordering::SeqCst);
2759                }),
2760                |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2761            )
2762            .expect("Failed to create significant change subscription");
2763
2764        // Test increases
2765        prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2766        assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2767        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2768        assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2769
2770        // Test significant increase
2771        prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2772        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2773        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2774        assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2775
2776        // Test significant decrease
2777        prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2778        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2779        assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2780        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2781
2782        // Test small decrease
2783        prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2784        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2785        assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2786        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2787
2788        // All subscriptions auto-cleanup when they go out of scope
2789    }
2790
2791    #[test]
2792    fn test_filtered_subscription_complex_filter() {
2793        let prop = ObservableProperty::new(0.0f64);
2794        let call_count = Arc::new(AtomicUsize::new(0));
2795        let values_received = Arc::new(RwLock::new(Vec::new()));
2796
2797        let count = call_count.clone();
2798        let values = values_received.clone();
2799        let _subscription = prop
2800            .subscribe_filtered_with_subscription(
2801                Arc::new(move |old, new| {
2802                    count.fetch_add(1, Ordering::SeqCst);
2803                    if let Ok(mut v) = values.write() {
2804                        v.push((*old, *new));
2805                    }
2806                }),
2807                |old, new| {
2808                    // Complex filter: trigger only when crossing integer boundaries
2809                    // and the change is significant (> 0.5)
2810                    let old_int = old.floor() as i32;
2811                    let new_int = new.floor() as i32;
2812                    old_int != new_int && (new - old).abs() > 0.5_f64
2813                },
2814            )
2815            .expect("Failed to create complex filtered subscription");
2816
2817        // Small changes within same integer - should not trigger
2818        prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2819        prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2820        assert_eq!(call_count.load(Ordering::SeqCst), 0);
2821
2822        // Cross integer boundary with significant change - should trigger
2823        prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2824        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2825
2826        // Small cross-boundary change - should not trigger
2827        prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2828        prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2829        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2830
2831        // Large cross-boundary change - should trigger
2832        prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2833        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2834
2835        // Verify received values
2836        let values = values_received.read().expect("Failed to read values in complex filter test");
2837        assert_eq!(values.len(), 2);
2838        assert_eq!(values[0], (0.7, 1.3));
2839        assert_eq!(values[1], (2.1, 3.5));
2840    }
2841
2842    #[test]
2843    fn test_filtered_subscription_error_handling() {
2844        let prop = Arc::new(ObservableProperty::new(0));
2845        let prop_clone = prop.clone();
2846
2847        // Poison the lock
2848        let poison_thread = thread::spawn(move || {
2849            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2850            panic!("Deliberate panic to poison the lock");
2851        });
2852        let _ = poison_thread.join();
2853
2854        // With graceful degradation, subscribe_filtered_with_subscription should succeed
2855        let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2856        assert!(result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
2857    }
2858
2859    #[test]
2860    fn test_filtered_subscription_vs_manual_filtered() {
2861        let prop = ObservableProperty::new(0);
2862        let auto_count = Arc::new(AtomicUsize::new(0));
2863        let manual_count = Arc::new(AtomicUsize::new(0));
2864
2865        // Manual filtered subscription
2866        let manual_count_clone = manual_count.clone();
2867        let manual_id = prop
2868            .subscribe_filtered(
2869                Arc::new(move |_, _| {
2870                    manual_count_clone.fetch_add(1, Ordering::SeqCst);
2871                }),
2872                |old, new| new > old,
2873            )
2874            .expect("Failed to create manual filtered subscription");
2875
2876        // Automatic filtered subscription
2877        let auto_count_clone = auto_count.clone();
2878        let _auto_subscription = prop
2879            .subscribe_filtered_with_subscription(
2880                Arc::new(move |_, _| {
2881                    auto_count_clone.fetch_add(1, Ordering::SeqCst);
2882                }),
2883                |old, new| new > old,
2884            )
2885            .expect("Failed to create automatic filtered subscription");
2886
2887        // Both should be triggered by increases
2888        prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2889        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2890        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2891
2892        // Neither should be triggered by decreases
2893        prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2894        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2895        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2896
2897        // Both should be triggered by increases again
2898        prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2899        assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2900        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2901
2902        // Manual cleanup
2903        prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2904
2905        // Only automatic subscription should be active
2906        prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2907        assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2908        assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2909
2910        // Auto subscription cleaned up when it goes out of scope
2911    }
2912
2913    #[test]
2914    fn test_filtered_subscription_with_panicking_filter() {
2915        let prop = ObservableProperty::new(0);
2916        let call_count = Arc::new(AtomicUsize::new(0));
2917
2918        let count = call_count.clone();
2919        let _subscription = prop
2920            .subscribe_filtered_with_subscription(
2921                Arc::new(move |_, _| {
2922                    count.fetch_add(1, Ordering::SeqCst);
2923                }),
2924                |_, new| {
2925                    if *new == 42 {
2926                        panic!("Filter panic on 42");
2927                    }
2928                    true // Accept all other values
2929                },
2930            )
2931            .expect("Failed to create panicking filter subscription");
2932
2933        // Normal value should work
2934        prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2935        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2936
2937        // Value that causes filter to panic should be handled gracefully
2938        // The behavior here depends on how the filter panic is handled
2939        // In the current implementation, filter panics may cause the observer to not be called
2940        prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2941
2942        // Observer should still work for subsequent normal values
2943        prop.set(2).expect("Failed to set property to 2 after filter panic");
2944        // Note: The exact count here depends on panic handling implementation
2945        // The important thing is that the system doesn't crash
2946    }
2947
2948    #[test]
2949    fn test_subscription_thread_safety() {
2950        let prop = Arc::new(ObservableProperty::new(0));
2951        let num_threads = 8;
2952        let operations_per_thread = 50;
2953        let total_calls = Arc::new(AtomicUsize::new(0));
2954
2955        let handles: Vec<_> = (0..num_threads)
2956            .map(|thread_id| {
2957                let prop_clone = prop.clone();
2958                let calls_clone = total_calls.clone();
2959
2960                thread::spawn(move || {
2961                    let mut local_subscriptions = Vec::new();
2962
2963                    for i in 0..operations_per_thread {
2964                        let calls = calls_clone.clone();
2965                        let subscription = prop_clone
2966                            .subscribe_with_subscription(Arc::new(move |old, new| {
2967                                calls.fetch_add(1, Ordering::SeqCst);
2968                                // Simulate some work
2969                                let _ = thread_id + i + old + new;
2970                            }))
2971                            .expect("Should be able to create subscription");
2972
2973                        local_subscriptions.push(subscription);
2974
2975                        // Trigger observers
2976                        prop_clone
2977                            .set(thread_id * 1000 + i)
2978                            .expect("Should be able to set value");
2979
2980                        // Occasionally drop some subscriptions
2981                        if i % 5 == 0 && !local_subscriptions.is_empty() {
2982                            local_subscriptions.remove(0); // Drop first subscription
2983                        }
2984                    }
2985
2986                    // All remaining subscriptions dropped when vector goes out of scope
2987                })
2988            })
2989            .collect();
2990
2991        // Wait for all threads to complete
2992        for handle in handles {
2993            handle.join().expect("Thread should complete successfully");
2994        }
2995
2996        // Property should still be functional after all the concurrent operations
2997        prop.set(9999).expect("Property should still work");
2998
2999        // We can't easily verify the exact call count due to the complex timing,
3000        // but we can verify that the system didn't crash and is still operational
3001        println!(
3002            "Total observer calls: {}",
3003            total_calls.load(Ordering::SeqCst)
3004        );
3005    }
3006
3007    #[test]
3008    fn test_subscription_cross_thread_drop() {
3009        let prop = Arc::new(ObservableProperty::new(0));
3010        let call_count = Arc::new(AtomicUsize::new(0));
3011
3012        // Create subscription in main thread
3013        let count = call_count.clone();
3014        let subscription = prop
3015            .subscribe_with_subscription(Arc::new(move |_, _| {
3016                count.fetch_add(1, Ordering::SeqCst);
3017            }))
3018            .expect("Failed to create subscription for cross-thread drop test");
3019
3020        // Verify observer is active
3021        prop.set(1).expect("Failed to set property value in cross-thread drop test");
3022        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3023
3024        // Move subscription to another thread and drop it there
3025        let prop_clone = prop.clone();
3026        let call_count_clone = call_count.clone();
3027
3028        let handle = thread::spawn(move || {
3029            // Verify observer is still active in the other thread
3030            prop_clone.set(2).expect("Failed to set property value in other thread");
3031            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
3032
3033            // Drop subscription in this thread
3034            drop(subscription);
3035
3036            // Verify observer is no longer active
3037            prop_clone.set(3).expect("Failed to set property value after drop in other thread");
3038            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
3039        });
3040
3041        handle.join().expect("Failed to join cross-thread drop test thread");
3042
3043        // Verify observer is still inactive in main thread
3044        prop.set(4).expect("Failed to set property value after thread join");
3045        assert_eq!(call_count.load(Ordering::SeqCst), 2);
3046    }
3047
3048    #[test]
3049    fn test_concurrent_subscription_creation_and_property_changes() {
3050        let prop = Arc::new(ObservableProperty::new(0));
3051        let total_notifications = Arc::new(AtomicUsize::new(0));
3052        let num_subscriber_threads = 4;
3053        let num_setter_threads = 2;
3054        let operations_per_thread = 25;
3055
3056        // Threads that create and destroy subscriptions
3057        let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
3058            .map(|_| {
3059                let prop_clone = prop.clone();
3060                let notifications_clone = total_notifications.clone();
3061
3062                thread::spawn(move || {
3063                    for _ in 0..operations_per_thread {
3064                        let notifications = notifications_clone.clone();
3065                        let _subscription = prop_clone
3066                            .subscribe_with_subscription(Arc::new(move |_, _| {
3067                                notifications.fetch_add(1, Ordering::SeqCst);
3068                            }))
3069                            .expect("Should create subscription");
3070
3071                        // Keep subscription alive for a short time
3072                        thread::sleep(Duration::from_millis(1));
3073
3074                        // Subscription dropped when _subscription goes out of scope
3075                    }
3076                })
3077            })
3078            .collect();
3079
3080        // Threads that continuously change the property value
3081        let setter_handles: Vec<_> = (0..num_setter_threads)
3082            .map(|thread_id| {
3083                let prop_clone = prop.clone();
3084
3085                thread::spawn(move || {
3086                    for i in 0..operations_per_thread * 2 {
3087                        prop_clone
3088                            .set(thread_id * 10000 + i)
3089                            .expect("Should set value");
3090                        thread::sleep(Duration::from_millis(1));
3091                    }
3092                })
3093            })
3094            .collect();
3095
3096        // Wait for all threads to complete
3097        for handle in subscriber_handles
3098            .into_iter()
3099            .chain(setter_handles.into_iter())
3100        {
3101            handle.join().expect("Thread should complete");
3102        }
3103
3104        // System should be stable after concurrent operations
3105        prop.set(99999).expect("Property should still work");
3106
3107        println!(
3108            "Total notifications during concurrent test: {}",
3109            total_notifications.load(Ordering::SeqCst)
3110        );
3111    }
3112
3113    #[test]
3114    fn test_filtered_subscription_thread_safety() {
3115        let prop = Arc::new(ObservableProperty::new(0));
3116        let increase_notifications = Arc::new(AtomicUsize::new(0));
3117        let decrease_notifications = Arc::new(AtomicUsize::new(0));
3118        let num_threads = 6;
3119
3120        let handles: Vec<_> = (0..num_threads)
3121            .map(|thread_id| {
3122                let prop_clone = prop.clone();
3123                let inc_notifications = increase_notifications.clone();
3124                let dec_notifications = decrease_notifications.clone();
3125
3126                thread::spawn(move || {
3127                    // Create increase-only subscription
3128                    let inc_count = inc_notifications.clone();
3129                    let _inc_subscription = prop_clone
3130                        .subscribe_filtered_with_subscription(
3131                            Arc::new(move |_, _| {
3132                                inc_count.fetch_add(1, Ordering::SeqCst);
3133                            }),
3134                            |old, new| new > old,
3135                        )
3136                        .expect("Should create filtered subscription");
3137
3138                    // Create decrease-only subscription
3139                    let dec_count = dec_notifications.clone();
3140                    let _dec_subscription = prop_clone
3141                        .subscribe_filtered_with_subscription(
3142                            Arc::new(move |_, _| {
3143                                dec_count.fetch_add(1, Ordering::SeqCst);
3144                            }),
3145                            |old, new| new < old,
3146                        )
3147                        .expect("Should create filtered subscription");
3148
3149                    // Perform some property changes
3150                    let base_value = thread_id * 100;
3151                    for i in 0..20 {
3152                        let new_value = base_value + (i % 10); // Creates increases and decreases
3153                        prop_clone.set(new_value).expect("Should set value");
3154                        thread::sleep(Duration::from_millis(1));
3155                    }
3156
3157                    // Subscriptions automatically cleaned up when going out of scope
3158                })
3159            })
3160            .collect();
3161
3162        // Wait for all threads
3163        for handle in handles {
3164            handle.join().expect("Thread should complete");
3165        }
3166
3167        // Verify system is still operational
3168        let initial_inc = increase_notifications.load(Ordering::SeqCst);
3169        let initial_dec = decrease_notifications.load(Ordering::SeqCst);
3170
3171        prop.set(1000).expect("Property should still work");
3172        prop.set(2000).expect("Property should still work");
3173
3174        // No new notifications should occur (all subscriptions cleaned up)
3175        assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
3176        assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
3177
3178        println!(
3179            "Increase notifications: {}, Decrease notifications: {}",
3180            initial_inc, initial_dec
3181        );
3182    }
3183
3184    #[test]
3185    fn test_subscription_with_async_property_changes() {
3186        let prop = Arc::new(ObservableProperty::new(0));
3187        let sync_notifications = Arc::new(AtomicUsize::new(0));
3188        let async_notifications = Arc::new(AtomicUsize::new(0));
3189
3190        // Subscription that tracks sync notifications
3191        let sync_count = sync_notifications.clone();
3192        let _sync_subscription = prop
3193            .subscribe_with_subscription(Arc::new(move |old, new| {
3194                sync_count.fetch_add(1, Ordering::SeqCst);
3195                // Simulate slow observer work
3196                thread::sleep(Duration::from_millis(5));
3197                println!("Sync observer: {} -> {}", old, new);
3198            }))
3199            .expect("Failed to create sync subscription");
3200
3201        // Subscription that tracks async notifications
3202        let async_count = async_notifications.clone();
3203        let _async_subscription = prop
3204            .subscribe_with_subscription(Arc::new(move |old, new| {
3205                async_count.fetch_add(1, Ordering::SeqCst);
3206                println!("Async observer: {} -> {}", old, new);
3207            }))
3208            .expect("Failed to create async subscription");
3209
3210        // Test sync property changes
3211        let start = std::time::Instant::now();
3212        prop.set(1).expect("Failed to set property value 1 in async test");
3213        prop.set(2).expect("Failed to set property value 2 in async test");
3214        let sync_duration = start.elapsed();
3215
3216        // Test async property changes
3217        let start = std::time::Instant::now();
3218        prop.set_async(3).expect("Failed to set property value 3 async");
3219        prop.set_async(4).expect("Failed to set property value 4 async");
3220        let async_duration = start.elapsed();
3221
3222        // Async should be much faster
3223        assert!(async_duration < sync_duration);
3224
3225        // Wait for async observers to complete
3226        thread::sleep(Duration::from_millis(50));
3227
3228        // All observers should have been called
3229        assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
3230        assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
3231
3232        // Subscriptions auto-cleanup when going out of scope
3233    }
3234
3235    #[test]
3236    fn test_subscription_creation_with_poisoned_lock() {
3237        let prop = Arc::new(ObservableProperty::new(0));
3238        let prop_clone = prop.clone();
3239
3240        // Create a valid subscription before poisoning
3241        let call_count = Arc::new(AtomicUsize::new(0));
3242        let count = call_count.clone();
3243        let existing_subscription = prop
3244            .subscribe_with_subscription(Arc::new(move |_, _| {
3245                count.fetch_add(1, Ordering::SeqCst);
3246            }))
3247            .expect("Failed to create subscription before poisoning");
3248
3249        // Poison the lock
3250        let poison_thread = thread::spawn(move || {
3251            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
3252            panic!("Deliberate panic to poison the lock");
3253        });
3254        let _ = poison_thread.join();
3255
3256        // With graceful degradation, new subscription creation should succeed
3257        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
3258        assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3259
3260        // New filtered subscription creation should also succeed
3261        let filtered_result =
3262            prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
3263        assert!(filtered_result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
3264
3265        // Dropping existing subscription should not panic
3266        drop(existing_subscription);
3267    }
3268
3269    #[test]
3270    fn test_subscription_cleanup_behavior_with_poisoned_lock() {
3271        // This test specifically verifies that Drop doesn't panic with poisoned locks
3272        let prop = Arc::new(ObservableProperty::new(0));
3273        let call_count = Arc::new(AtomicUsize::new(0));
3274
3275        // Create subscription
3276        let count = call_count.clone();
3277        let subscription = prop
3278            .subscribe_with_subscription(Arc::new(move |_, _| {
3279                count.fetch_add(1, Ordering::SeqCst);
3280            }))
3281            .expect("Failed to create subscription for cleanup behavior test");
3282
3283        // Verify it works initially
3284        prop.set(1).expect("Failed to set property value in cleanup behavior test");
3285        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3286
3287        // Poison the lock from another thread
3288        let prop_clone = prop.clone();
3289        let poison_thread = thread::spawn(move || {
3290            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
3291            panic!("Deliberate panic to poison the lock");
3292        });
3293        let _ = poison_thread.join();
3294
3295        // Now drop the subscription - this should NOT panic
3296        // The Drop implementation should handle the poisoned lock gracefully
3297        drop(subscription);
3298
3299        // Test succeeds if we reach this point without panicking
3300    }
3301
3302    #[test]
3303    fn test_multiple_subscription_cleanup_with_poisoned_lock() {
3304        let prop = Arc::new(ObservableProperty::new(0));
3305        let mut subscriptions = Vec::new();
3306
3307        // Create multiple subscriptions
3308        for i in 0..5 {
3309            let call_count = Arc::new(AtomicUsize::new(0));
3310            let count = call_count.clone();
3311            let subscription = prop
3312                .subscribe_with_subscription(Arc::new(move |old, new| {
3313                    count.fetch_add(1, Ordering::SeqCst);
3314                    println!("Observer {}: {} -> {}", i, old, new);
3315                }))
3316                .expect("Failed to create subscription in multiple cleanup test");
3317            subscriptions.push(subscription);
3318        }
3319
3320        // Verify they all work
3321        prop.set(42).expect("Failed to set property value in multiple cleanup test");
3322
3323        // Poison the lock
3324        let prop_clone = prop.clone();
3325        let poison_thread = thread::spawn(move || {
3326            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
3327            panic!("Deliberate panic to poison the lock");
3328        });
3329        let _ = poison_thread.join();
3330
3331        // Drop all subscriptions - none should panic
3332        for subscription in subscriptions {
3333            drop(subscription);
3334        }
3335
3336        // Test succeeds if no panics occurred
3337    }
3338
3339    #[test]
3340    fn test_subscription_behavior_before_and_after_poison() {
3341        let prop = Arc::new(ObservableProperty::new(0));
3342        let before_poison_count = Arc::new(AtomicUsize::new(0));
3343        let after_poison_count = Arc::new(AtomicUsize::new(0));
3344
3345        // Create subscription before poisoning
3346        let before_count = before_poison_count.clone();
3347        let before_subscription = prop
3348            .subscribe_with_subscription(Arc::new(move |_, _| {
3349                before_count.fetch_add(1, Ordering::SeqCst);
3350            }))
3351            .expect("Failed to create subscription before poison test");
3352
3353        // Verify it works
3354        prop.set(1).expect("Failed to set property value before poison test");
3355        assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
3356
3357        // Poison the lock
3358        let prop_clone = prop.clone();
3359        let poison_thread = thread::spawn(move || {
3360            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
3361            panic!("Deliberate panic to poison the lock");
3362        });
3363        let _ = poison_thread.join();
3364
3365        // With graceful degradation, subscription creation after poisoning should succeed
3366        let after_count = after_poison_count.clone();
3367        let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
3368            after_count.fetch_add(1, Ordering::SeqCst);
3369        }));
3370        assert!(after_result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3371        
3372        let _after_subscription = after_result.unwrap();
3373
3374        // Clean up the before-poison subscription - should not panic
3375        drop(before_subscription);
3376    }
3377
3378    #[test]
3379    fn test_concurrent_subscription_drops_with_poison() {
3380        let prop = Arc::new(ObservableProperty::new(0));
3381        let num_subscriptions = 10;
3382        let mut subscriptions = Vec::new();
3383
3384        // Create multiple subscriptions
3385        for i in 0..num_subscriptions {
3386            let call_count = Arc::new(AtomicUsize::new(0));
3387            let count = call_count.clone();
3388            let subscription = prop
3389                .subscribe_with_subscription(Arc::new(move |_, _| {
3390                    count.fetch_add(1, Ordering::SeqCst);
3391                    println!("Observer {}", i);
3392                }))
3393                .expect("Failed to create subscription in concurrent drops test");
3394            subscriptions.push(subscription);
3395        }
3396
3397        // Poison the lock
3398        let prop_clone = prop.clone();
3399        let poison_thread = thread::spawn(move || {
3400            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
3401            panic!("Deliberate panic to poison the lock");
3402        });
3403        let _ = poison_thread.join();
3404
3405        // Drop subscriptions concurrently from multiple threads
3406        let handles: Vec<_> = subscriptions
3407            .into_iter()
3408            .enumerate()
3409            .map(|(i, subscription)| {
3410                thread::spawn(move || {
3411                    // Add some randomness to timing
3412                    thread::sleep(Duration::from_millis(i as u64 % 5));
3413                    drop(subscription);
3414                    println!("Dropped subscription {}", i);
3415                })
3416            })
3417            .collect();
3418
3419        // Wait for all drops to complete
3420        for handle in handles {
3421            handle
3422                .join()
3423                .expect("Drop thread should complete without panic");
3424        }
3425
3426        // Test succeeds if all threads completed successfully
3427    }
3428
3429    
3430    #[test]
3431    fn test_with_max_threads_creation() {
3432        // Test creation with various thread counts
3433        let prop1 = ObservableProperty::with_max_threads(42, 1);
3434        let prop2 = ObservableProperty::with_max_threads("test".to_string(), 8);
3435        let prop3 = ObservableProperty::with_max_threads(0.5_f64, 16);
3436
3437        // Verify initial values are correct
3438        assert_eq!(prop1.get().expect("Failed to get prop1 value"), 42);
3439        assert_eq!(prop2.get().expect("Failed to get prop2 value"), "test");
3440        assert_eq!(prop3.get().expect("Failed to get prop3 value"), 0.5);
3441    }
3442
3443    #[test]
3444    fn test_with_max_threads_zero_defaults_to_max_threads() {
3445        // Test that zero max_threads defaults to MAX_THREADS (4)
3446        let prop1 = ObservableProperty::with_max_threads(100, 0);
3447        let prop2 = ObservableProperty::new(100); // Uses default MAX_THREADS
3448
3449        // Both should have the same max_threads value
3450        // We can't directly access max_threads, but we can verify behavior is consistent
3451        assert_eq!(prop1.get().expect("Failed to get prop1 value"), 100);
3452        assert_eq!(prop2.get().expect("Failed to get prop2 value"), 100);
3453    }
3454
3455    #[test]
3456    fn test_with_max_threads_basic_functionality() {
3457        let prop = ObservableProperty::with_max_threads(0, 2);
3458        let call_count = Arc::new(AtomicUsize::new(0));
3459
3460        // Subscribe an observer
3461        let count = call_count.clone();
3462        let _subscription = prop
3463            .subscribe_with_subscription(Arc::new(move |old, new| {
3464                count.fetch_add(1, Ordering::SeqCst);
3465                assert_eq!(*old, 0);
3466                assert_eq!(*new, 42);
3467            }))
3468            .expect("Failed to create subscription for max_threads test");
3469
3470        // Test synchronous set
3471        prop.set(42).expect("Failed to set value synchronously");
3472        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3473
3474        // Test asynchronous set
3475        prop.set_async(43).expect("Failed to set value asynchronously");
3476        
3477        // Wait for async observers to complete
3478        thread::sleep(Duration::from_millis(50));
3479        assert_eq!(call_count.load(Ordering::SeqCst), 2);
3480    }
3481
3482    #[test]
3483    fn test_with_max_threads_async_performance() {
3484        // Test that with_max_threads affects async performance
3485        let prop = ObservableProperty::with_max_threads(0, 1); // Single thread
3486        let slow_call_count = Arc::new(AtomicUsize::new(0));
3487
3488        // Add multiple slow observers
3489        let mut subscriptions = Vec::new();
3490        for _ in 0..4 {
3491            let count = slow_call_count.clone();
3492            let subscription = prop
3493                .subscribe_with_subscription(Arc::new(move |_, _| {
3494                    thread::sleep(Duration::from_millis(25)); // Simulate slow work
3495                    count.fetch_add(1, Ordering::SeqCst);
3496                }))
3497                .expect("Failed to create slow observer subscription");
3498            subscriptions.push(subscription);
3499        }
3500
3501        // Measure time for async notification
3502        let start = std::time::Instant::now();
3503        prop.set_async(42).expect("Failed to set value asynchronously");
3504        let async_duration = start.elapsed();
3505
3506        // Should return quickly even with slow observers
3507        assert!(async_duration.as_millis() < 50, "Async set should return quickly");
3508
3509        // Wait for all observers to complete
3510        thread::sleep(Duration::from_millis(200));
3511        assert_eq!(slow_call_count.load(Ordering::SeqCst), 4);
3512    }
3513
3514    #[test]
3515    fn test_with_max_threads_vs_regular_constructor() {
3516        let prop_regular = ObservableProperty::new(42);
3517        let prop_custom = ObservableProperty::with_max_threads(42, 4); // Same as default
3518
3519        let count_regular = Arc::new(AtomicUsize::new(0));
3520        let count_custom = Arc::new(AtomicUsize::new(0));
3521
3522        // Both should behave identically
3523        let count1 = count_regular.clone();
3524        let _sub1 = prop_regular
3525            .subscribe_with_subscription(Arc::new(move |_, _| {
3526                count1.fetch_add(1, Ordering::SeqCst);
3527            }))
3528            .expect("Failed to create regular subscription");
3529
3530        let count2 = count_custom.clone();
3531        let _sub2 = prop_custom
3532            .subscribe_with_subscription(Arc::new(move |_, _| {
3533                count2.fetch_add(1, Ordering::SeqCst);
3534            }))
3535            .expect("Failed to create custom subscription");
3536
3537        // Test sync behavior
3538        prop_regular.set(100).expect("Failed to set regular property");
3539        prop_custom.set(100).expect("Failed to set custom property");
3540
3541        assert_eq!(count_regular.load(Ordering::SeqCst), 1);
3542        assert_eq!(count_custom.load(Ordering::SeqCst), 1);
3543
3544        // Test async behavior
3545        prop_regular.set_async(200).expect("Failed to set regular property async");
3546        prop_custom.set_async(200).expect("Failed to set custom property async");
3547
3548        thread::sleep(Duration::from_millis(50));
3549        assert_eq!(count_regular.load(Ordering::SeqCst), 2);
3550        assert_eq!(count_custom.load(Ordering::SeqCst), 2);
3551    }
3552
3553    #[test]
3554    fn test_with_max_threads_large_values() {
3555        // Test with very large max_threads values
3556        let prop = ObservableProperty::with_max_threads(0, 1000);
3557        let call_count = Arc::new(AtomicUsize::new(0));
3558
3559        // Add a few observers
3560        let count = call_count.clone();
3561        let _subscription = prop
3562            .subscribe_with_subscription(Arc::new(move |_, _| {
3563                count.fetch_add(1, Ordering::SeqCst);
3564            }))
3565            .expect("Failed to create subscription for large max_threads test");
3566
3567        // Should work normally even with excessive thread limit
3568        prop.set_async(42).expect("Failed to set value with large max_threads");
3569
3570        thread::sleep(Duration::from_millis(50));
3571        assert_eq!(call_count.load(Ordering::SeqCst), 1);
3572    }
3573
3574    #[test]
3575    fn test_with_max_threads_clone_behavior() {
3576        let prop1 = ObservableProperty::with_max_threads(42, 2);
3577        let prop2 = prop1.clone();
3578
3579        let call_count1 = Arc::new(AtomicUsize::new(0));
3580        let call_count2 = Arc::new(AtomicUsize::new(0));
3581
3582        // Subscribe to both properties
3583        let count1 = call_count1.clone();
3584        let _sub1 = prop1
3585            .subscribe_with_subscription(Arc::new(move |_, _| {
3586                count1.fetch_add(1, Ordering::SeqCst);
3587            }))
3588            .expect("Failed to create subscription for cloned property test");
3589
3590        let count2 = call_count2.clone();
3591        let _sub2 = prop2
3592            .subscribe_with_subscription(Arc::new(move |_, _| {
3593                count2.fetch_add(1, Ordering::SeqCst);
3594            }))
3595            .expect("Failed to create subscription for original property test");
3596
3597        // Changes through either property should trigger both observers
3598        prop1.set_async(100).expect("Failed to set value through prop1");
3599        thread::sleep(Duration::from_millis(50));
3600        
3601        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
3602        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
3603
3604        prop2.set_async(200).expect("Failed to set value through prop2");
3605        thread::sleep(Duration::from_millis(50));
3606        
3607        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
3608        assert_eq!(call_count2.load(Ordering::SeqCst), 2);
3609    }
3610
3611    #[test]
3612    fn test_with_max_threads_thread_safety() {
3613        let prop = Arc::new(ObservableProperty::with_max_threads(0, 3));
3614        let call_count = Arc::new(AtomicUsize::new(0));
3615
3616        // Add observers from multiple threads
3617        let handles: Vec<_> = (0..5)
3618            .map(|thread_id| {
3619                let prop_clone = prop.clone();
3620                let count_clone = call_count.clone();
3621
3622                thread::spawn(move || {
3623                    let count = count_clone.clone();
3624                    let _subscription = prop_clone
3625                        .subscribe_with_subscription(Arc::new(move |_, _| {
3626                            count.fetch_add(1, Ordering::SeqCst);
3627                        }))
3628                        .expect("Failed to create thread-safe subscription");
3629
3630                    // Trigger async notifications from this thread
3631                    prop_clone
3632                        .set_async(thread_id * 10)
3633                        .expect("Failed to set value from thread");
3634                        
3635                    thread::sleep(Duration::from_millis(10));
3636                })
3637            })
3638            .collect();
3639
3640        // Wait for all threads to complete
3641        for handle in handles {
3642            handle.join().expect("Thread should complete successfully");
3643        }
3644
3645        // Wait for all async operations to complete
3646        thread::sleep(Duration::from_millis(100));
3647
3648        // Each set_async should trigger all active observers at that time
3649        // The exact count depends on timing, but should be > 0
3650        assert!(call_count.load(Ordering::SeqCst) > 0);
3651    }
3652
3653    #[test]
3654    fn test_with_max_threads_error_handling() {
3655        let prop = ObservableProperty::with_max_threads(42, 2);
3656        
3657        // Test that error handling works the same as regular properties
3658        let _subscription = prop
3659            .subscribe_with_subscription(Arc::new(|_, _| {
3660                // Normal observer
3661            }))
3662            .expect("Failed to create subscription for error handling test");
3663
3664        // Should handle errors gracefully
3665        assert!(prop.set(100).is_ok());
3666        assert!(prop.set_async(200).is_ok());
3667        assert_eq!(prop.get().expect("Failed to get value after error test"), 200);
3668    }
3669
3670    #[test]
3671    fn test_observer_limit_enforcement() {
3672        let prop = ObservableProperty::new(0);
3673        let mut observer_ids = Vec::new();
3674
3675        // Add observers up to the limit (using a small test to avoid slow tests)
3676        // In reality, MAX_OBSERVERS is 10,000, but we'll test the mechanism
3677        // by adding a reasonable number and then checking the error message
3678        for i in 0..100 {
3679            let result = prop.subscribe(Arc::new(move |_, _| {
3680                let _ = i; // Use the capture to make each observer unique
3681            }));
3682            assert!(result.is_ok(), "Should be able to add observer {}", i);
3683            observer_ids.push(result.unwrap());
3684        }
3685
3686        // Verify all observers were added
3687        assert_eq!(observer_ids.len(), 100);
3688
3689        // Verify we can still add more (we're well under the 10,000 limit)
3690        let result = prop.subscribe(Arc::new(|_, _| {}));
3691        assert!(result.is_ok());
3692    }
3693
3694    #[test]
3695    fn test_observer_limit_error_message() {
3696        let prop = ObservableProperty::new(0);
3697        
3698        // We can't easily test hitting the actual 10,000 limit in a unit test
3699        // (it would be too slow), but we can verify the error type exists
3700        // and the subscribe method has the check
3701        
3702        // Add a few observers successfully
3703        for _ in 0..10 {
3704            assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3705        }
3706
3707        // The mechanism is in place - the limit check happens before insertion
3708        // In production, if 10,000 observers are added, the 10,001st will fail
3709    }
3710
3711    #[test]
3712    fn test_observer_limit_with_unsubscribe() {
3713        let prop = ObservableProperty::new(0);
3714        
3715        // Add observers
3716        let mut ids = Vec::new();
3717        for _ in 0..50 {
3718            ids.push(prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe"));
3719        }
3720
3721        // Remove half of them
3722        for id in ids.iter().take(25) {
3723            assert!(prop.unsubscribe(*id).expect("Failed to unsubscribe"));
3724        }
3725
3726        // Should be able to add more observers after unsubscribing
3727        for _ in 0..30 {
3728            assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3729        }
3730    }
3731
3732    #[test]
3733    fn test_observer_limit_with_raii_subscriptions() {
3734        let prop = ObservableProperty::new(0);
3735        
3736        // Create RAII subscriptions
3737        let mut subscriptions = Vec::new();
3738        for _ in 0..50 {
3739            subscriptions.push(
3740                prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3741                    .expect("Failed to create subscription")
3742            );
3743        }
3744
3745        // Drop half of them (automatic cleanup)
3746        subscriptions.truncate(25);
3747
3748        // Should be able to add more after RAII cleanup
3749        for _ in 0..30 {
3750            let _sub = prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3751                .expect("Failed to create subscription after RAII cleanup");
3752        }
3753    }
3754
3755    #[test]
3756    fn test_filtered_subscription_respects_observer_limit() {
3757        let prop = ObservableProperty::new(0);
3758        
3759        // Add regular and filtered observers
3760        for i in 0..50 {
3761            if i % 2 == 0 {
3762                assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3763            } else {
3764                assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3765            }
3766        }
3767
3768        // Both types count toward the limit
3769        // Should still be well under the 10,000 limit
3770        assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3771    }
3772
3773    #[test]
3774    fn test_observer_limit_concurrent_subscriptions() {
3775        let prop = Arc::new(ObservableProperty::new(0));
3776        let success_count = Arc::new(AtomicUsize::new(0));
3777
3778        // Try to add observers from multiple threads
3779        let handles: Vec<_> = (0..10)
3780            .map(|_| {
3781                let prop_clone = prop.clone();
3782                let count_clone = success_count.clone();
3783                
3784                thread::spawn(move || {
3785                    for _ in 0..10 {
3786                        if prop_clone.subscribe(Arc::new(|_, _| {})).is_ok() {
3787                            count_clone.fetch_add(1, Ordering::SeqCst);
3788                        }
3789                        thread::sleep(Duration::from_micros(10));
3790                    }
3791                })
3792            })
3793            .collect();
3794
3795        for handle in handles {
3796            handle.join().expect("Thread should complete");
3797        }
3798
3799        // All 100 subscriptions should succeed (well under limit)
3800        assert_eq!(success_count.load(Ordering::SeqCst), 100);
3801    }
3802
3803    #[test]
3804    fn test_notify_observers_batch_releases_lock_early() {
3805        use std::sync::atomic::AtomicBool;
3806        
3807        let prop = Arc::new(ObservableProperty::new(0));
3808        let call_count = Arc::new(AtomicUsize::new(0));
3809        let started = Arc::new(AtomicBool::new(false));
3810        
3811        // Subscribe with a slow observer
3812        let started_clone = started.clone();
3813        let count_clone = call_count.clone();
3814        prop.subscribe(Arc::new(move |_, _| {
3815            started_clone.store(true, Ordering::SeqCst);
3816            count_clone.fetch_add(1, Ordering::SeqCst);
3817            // Simulate slow observer
3818            thread::sleep(Duration::from_millis(50));
3819        })).expect("Failed to subscribe");
3820        
3821        // Start batch notification in background
3822        let prop_clone = prop.clone();
3823        let batch_handle = thread::spawn(move || {
3824            prop_clone.notify_observers_batch(vec![(0, 1), (1, 2)]).expect("Failed to notify batch");
3825        });
3826        
3827        // Wait for observer to start
3828        while !started.load(Ordering::SeqCst) {
3829            thread::sleep(Duration::from_millis(1));
3830        }
3831        
3832        // Now verify we can still subscribe while observer is running
3833        // This proves the lock was released before observer execution
3834        let subscribe_result = prop.subscribe(Arc::new(|_, _| {
3835            // New observer
3836        }));
3837        
3838        assert!(subscribe_result.is_ok(), "Should be able to subscribe while batch notification is in progress");
3839        
3840        // Wait for batch to complete
3841        batch_handle.join().expect("Batch thread should complete");
3842        
3843        // Verify observers were called (2 changes in batch)
3844        assert_eq!(call_count.load(Ordering::SeqCst), 2);
3845    }
3846
3847    #[test]
3848    fn test_notify_observers_batch_panic_isolation() {
3849        let prop = ObservableProperty::new(0);
3850        let good_observer_count = Arc::new(AtomicUsize::new(0));
3851        let count_clone = good_observer_count.clone();
3852        
3853        // First observer that panics
3854        prop.subscribe(Arc::new(|_, _| {
3855            panic!("Deliberate panic in batch observer");
3856        })).expect("Failed to subscribe panicking observer");
3857        
3858        // Second observer that should still be called
3859        prop.subscribe(Arc::new(move |_, _| {
3860            count_clone.fetch_add(1, Ordering::SeqCst);
3861        })).expect("Failed to subscribe good observer");
3862        
3863        // Batch notification should not fail despite panic
3864        let result = prop.notify_observers_batch(vec![(0, 1), (1, 2), (2, 3)]);
3865        assert!(result.is_ok());
3866        
3867        // Second observer should have been called for all 3 changes
3868        assert_eq!(good_observer_count.load(Ordering::SeqCst), 3);
3869    }
3870
3871    #[test]
3872    fn test_notify_observers_batch_multiple_changes() {
3873        let prop = ObservableProperty::new(0);
3874        let received_changes = Arc::new(RwLock::new(Vec::new()));
3875        let changes_clone = received_changes.clone();
3876        
3877        prop.subscribe(Arc::new(move |old, new| {
3878            if let Ok(mut changes) = changes_clone.write() {
3879                changes.push((*old, *new));
3880            }
3881        })).expect("Failed to subscribe");
3882        
3883        // Send multiple changes
3884        prop.notify_observers_batch(vec![
3885            (0, 10),
3886            (10, 20),
3887            (20, 30),
3888            (30, 40),
3889        ]).expect("Failed to notify batch");
3890        
3891        let changes = received_changes.read().expect("Failed to read changes");
3892        assert_eq!(changes.len(), 4);
3893        assert_eq!(changes[0], (0, 10));
3894        assert_eq!(changes[1], (10, 20));
3895        assert_eq!(changes[2], (20, 30));
3896        assert_eq!(changes[3], (30, 40));
3897    }
3898
3899    #[test]
3900    fn test_notify_observers_batch_empty() {
3901        let prop = ObservableProperty::new(0);
3902        let call_count = Arc::new(AtomicUsize::new(0));
3903        let count_clone = call_count.clone();
3904        
3905        prop.subscribe(Arc::new(move |_, _| {
3906            count_clone.fetch_add(1, Ordering::SeqCst);
3907        })).expect("Failed to subscribe");
3908        
3909        // Empty batch should succeed without calling observers
3910        prop.notify_observers_batch(vec![]).expect("Failed with empty batch");
3911        
3912        assert_eq!(call_count.load(Ordering::SeqCst), 0);
3913    }
3914}