observable_property/
lib.rs

1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
15//!
16//! ## Quick Start
17//!
18//! ```rust
19//! use observable_property::ObservableProperty;
20//! use std::sync::Arc;
21//!
22//! // Create an observable property
23//! let property = ObservableProperty::new(42);
24//!
25//! // Subscribe to changes
26//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
27//!     println!("Value changed from {} to {}", old_value, new_value);
28//! })).map_err(|e| {
29//!     eprintln!("Failed to subscribe: {}", e);
30//!     e
31//! })?;
32//!
33//! // Change the value (triggers observer)
34//! property.set(100).map_err(|e| {
35//!     eprintln!("Failed to set value: {}", e);
36//!     e
37//! })?;
38//!
39//! // Unsubscribe when done
40//! property.unsubscribe(observer_id).map_err(|e| {
41//!     eprintln!("Failed to unsubscribe: {}", e);
42//!     e
43//! })?;
44//! # Ok::<(), observable_property::PropertyError>(())
45//! ```
46//!
47//! ## Multi-threading Example
48//!
49//! ```rust
50//! use observable_property::ObservableProperty;
51//! use std::sync::Arc;
52//! use std::thread;
53//!
54//! let property = Arc::new(ObservableProperty::new(0));
55//! let property_clone = property.clone();
56//!
57//! // Subscribe from one thread
58//! property.subscribe(Arc::new(|old, new| {
59//!     println!("Value changed: {} -> {}", old, new);
60//! })).map_err(|e| {
61//!     eprintln!("Failed to subscribe: {}", e);
62//!     e
63//! })?;
64//!
65//! // Modify from another thread
66//! thread::spawn(move || {
67//!     if let Err(e) = property_clone.set(42) {
68//!         eprintln!("Failed to set value: {}", e);
69//!     }
70//! }).join().expect("Thread panicked");
71//! # Ok::<(), observable_property::PropertyError>(())
72//! ```
73//!
74//! ## RAII Subscriptions (Recommended)
75//!
76//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
77//!
78//! ```rust
79//! use observable_property::ObservableProperty;
80//! use std::sync::Arc;
81//!
82//! # fn main() -> Result<(), observable_property::PropertyError> {
83//! let property = ObservableProperty::new(0);
84//!
85//! {
86//!     // Create RAII subscription - automatically cleaned up when dropped
87//!     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
88//!         println!("Value changed: {} -> {}", old, new);
89//!     }))?;
90//!
91//!     property.set(42)?; // Prints: "Value changed: 0 -> 42"
92//!
93//!     // Subscription automatically unsubscribes when leaving this scope
94//! }
95//!
96//! // No observer active anymore
97//! property.set(100)?; // No output
98//! # Ok(())
99//! # }
100//! ```
101//!
102//! ## Filtered RAII Subscriptions
103//!
104//! Combine filtering with automatic cleanup for conditional monitoring:
105//!
106//! ```rust
107//! use observable_property::ObservableProperty;
108//! use std::sync::Arc;
109//!
110//! # fn main() -> Result<(), observable_property::PropertyError> {
111//! let temperature = ObservableProperty::new(20.0);
112//!
113//! {
114//!     // Monitor only significant temperature increases with automatic cleanup
115//!     let _heat_warning = temperature.subscribe_filtered_with_subscription(
116//!         Arc::new(|old, new| {
117//!             println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
118//!         }),
119//!         |old, new| new > old && (new - old) > 5.0
120//!     )?;
121//!
122//!     temperature.set(22.0)?; // No warning (only 2°C increase)
123//!     temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
124//!
125//!     // Subscription automatically cleaned up here
126//! }
127//!
128//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
129//! # Ok(())
130//! # }
131//! ```
132//!
133//! ## Subscription Management Comparison
134//!
135//! ```rust
136//! use observable_property::ObservableProperty;
137//! use std::sync::Arc;
138//!
139//! # fn main() -> Result<(), observable_property::PropertyError> {
140//! let property = ObservableProperty::new(0);
141//! let observer = Arc::new(|old: &i32, new: &i32| {
142//!     println!("Value: {} -> {}", old, new);
143//! });
144//!
145//! // Method 1: Manual subscription management
146//! let observer_id = property.subscribe(observer.clone())?;
147//! property.set(42)?;
148//! property.unsubscribe(observer_id)?; // Manual cleanup required
149//!
150//! // Method 2: RAII subscription management (recommended)
151//! {
152//!     let _subscription = property.subscribe_with_subscription(observer)?;
153//!     property.set(100)?;
154//!     // Automatic cleanup when _subscription goes out of scope
155//! }
156//! # Ok(())
157//! # }
158//! ```
159//!
160//! ## Advanced RAII Patterns
161//!
162//! Comprehensive example showing various RAII subscription patterns:
163//!
164//! ```rust
165//! use observable_property::ObservableProperty;
166//! use std::sync::Arc;
167//!
168//! # fn main() -> Result<(), observable_property::PropertyError> {
169//! // System monitoring example
170//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
171//! let memory_usage = ObservableProperty::new(1024); // MB
172//! let active_connections = ObservableProperty::new(0u32);
173//!
174//! // Conditional monitoring based on system state
175//! let high_load_monitoring = cpu_usage.get()? > 50.0;
176//!
177//! if high_load_monitoring {
178//!     // Critical system monitoring - active only during high load
179//!     let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
180//!         Arc::new(|old, new| {
181//!             println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
182//!         }),
183//!         |_, new| *new > 80.0
184//!     )?;
185//!
186//!     let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
187//!         Arc::new(|old, new| {
188//!             println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
189//!         }),
190//!         |_, new| *new > 8192 // > 8GB
191//!     )?;
192//!
193//!     // Simulate system load changes
194//!     cpu_usage.set(85.0)?;     // Would trigger critical alert
195//!     memory_usage.set(9216)?;  // Would trigger memory warning
196//!     
197//!     // All monitoring automatically stops when exiting this block
198//! }
199//!
200//! // Connection monitoring with scoped lifetime
201//! {
202//!     let _connection_monitor = active_connections.subscribe_with_subscription(
203//!         Arc::new(|old, new| {
204//!             if new > old {
205//!                 println!("📈 New connections: {} -> {}", old, new);
206//!             } else if new < old {
207//!                 println!("📉 Connections closed: {} -> {}", old, new);
208//!             }
209//!         })
210//!     )?;
211//!
212//!     active_connections.set(5)?;  // Prints: "📈 New connections: 0 -> 5"
213//!     active_connections.set(3)?;  // Prints: "📉 Connections closed: 5 -> 3"
214//!     active_connections.set(8)?;  // Prints: "📈 New connections: 3 -> 8"
215//!
216//!     // Connection monitoring automatically stops here
217//! }
218//!
219//! // No monitoring active anymore
220//! cpu_usage.set(95.0)?;         // No output
221//! memory_usage.set(10240)?;     // No output  
222//! active_connections.set(15)?;  // No output
223//!
224//! println!("All monitoring automatically cleaned up!");
225//! # Ok(())
226//! # }
227//! ```
228
229use std::collections::HashMap;
230use std::fmt;
231use std::panic;
232use std::sync::{Arc, RwLock};
233use std::thread;
234
235/// Maximum number of background threads used for asynchronous observer notifications
236///
237/// This constant controls the degree of parallelism when using `set_async()` to notify
238/// observers. The observer list is divided into batches, with each batch running in
239/// its own background thread, up to this maximum number of threads.
240///
241/// # Rationale
242///
243/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
244/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead  
245/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
246/// - **System Responsiveness**: Limits thread contention on multi-core systems
247///
248/// # Implementation Details
249///
250/// When `set_async()` is called:
251/// 1. All observers are collected into a snapshot
252/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
253/// 3. Each batch executes in its own `thread::spawn()` call
254/// 4. Observers within each batch are executed sequentially
255///
256/// For example, with 100 observers and `MAX_THREADS = 4`:
257/// - Batch 1: Observers 1-25 (Thread 1)
258/// - Batch 2: Observers 26-50 (Thread 2)  
259/// - Batch 3: Observers 51-75 (Thread 3)
260/// - Batch 4: Observers 76-100 (Thread 4)
261///
262/// # Tuning Considerations
263///
264/// This value can be adjusted based on your application's needs:
265/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
266/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
267/// - **Memory-constrained systems**: Lower values reduce thread overhead
268/// - **Real-time systems**: Lower values reduce scheduling unpredictability
269///
270/// # Thread Safety
271///
272/// This constant is used only during the batching calculation and does not affect
273/// the thread safety of the overall system.
274const MAX_THREADS: usize = 4;
275/// Errors that can occur when working with ObservableProperty
276#[derive(Debug, Clone)]
277pub enum PropertyError {
278    /// Failed to acquire a read lock on the property
279    ReadLockError {
280        /// Context describing what operation was being attempted
281        context: String,
282    },
283    /// Failed to acquire a write lock on the property  
284    WriteLockError {
285        /// Context describing what operation was being attempted
286        context: String,
287    },
288    /// Attempted to unsubscribe an observer that doesn't exist
289    ObserverNotFound {
290        /// The ID of the observer that wasn't found
291        id: usize,
292    },
293    /// The property's lock has been poisoned due to a panic in another thread
294    PoisonedLock,
295    /// An observer function encountered an error during execution
296    ObserverError {
297        /// Description of what went wrong
298        reason: String,
299    },
300    /// The thread pool for async notifications is exhausted
301    ThreadPoolExhausted,
302    /// Invalid configuration was provided
303    InvalidConfiguration {
304        /// Description of the invalid configuration
305        reason: String,
306    },
307}
308
309impl fmt::Display for PropertyError {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        match self {
312            PropertyError::ReadLockError { context } => {
313                write!(f, "Failed to acquire read lock: {}", context)
314            }
315            PropertyError::WriteLockError { context } => {
316                write!(f, "Failed to acquire write lock: {}", context)
317            }
318            PropertyError::ObserverNotFound { id } => {
319                write!(f, "Observer with ID {} not found", id)
320            }
321            PropertyError::PoisonedLock => {
322                write!(
323                    f,
324                    "Property is in a poisoned state due to a panic in another thread"
325                )
326            }
327            PropertyError::ObserverError { reason } => {
328                write!(f, "Observer execution failed: {}", reason)
329            }
330            PropertyError::ThreadPoolExhausted => {
331                write!(f, "Thread pool is exhausted and cannot spawn more observers")
332            }
333            PropertyError::InvalidConfiguration { reason } => {
334                write!(f, "Invalid configuration: {}", reason)
335            }
336        }
337    }
338}
339
340impl std::error::Error for PropertyError {}
341
342/// Function type for observers that get called when property values change
343pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
344
345/// Unique identifier for registered observers
346pub type ObserverId = usize;
347
348/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
349///
350/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
351/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
352/// `Drop` implementation automatically removes the associated observer from the property.
353///
354/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
355/// leaks in scenarios where observers might otherwise be forgotten.
356///
357/// # Type Requirements
358///
359/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
360/// - `Clone`: Required for observer notifications
361/// - `Send`: Required for transferring between threads  
362/// - `Sync`: Required for concurrent access from multiple threads
363/// - `'static`: Required for observer callbacks that may outlive the original scope
364///
365/// # Examples
366///
367/// ## Basic RAII Subscription
368///
369/// ```rust
370/// use observable_property::ObservableProperty;
371/// use std::sync::Arc;
372///
373/// # fn main() -> Result<(), observable_property::PropertyError> {
374/// let property = ObservableProperty::new(0);
375///
376/// {
377///     // Create subscription - observer is automatically registered
378///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
379///         println!("Value changed: {} -> {}", old, new);
380///     }))?;
381///
382///     property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
383///
384///     // When _subscription goes out of scope here, observer is automatically removed
385/// }
386///
387/// property.set(100)?; // No observer output - subscription was automatically cleaned up
388/// # Ok(())
389/// # }
390/// ```
391///
392/// ## Cross-Thread Subscription Management
393///
394/// ```rust
395/// use observable_property::ObservableProperty;
396/// use std::sync::Arc;
397/// use std::thread;
398///
399/// # fn main() -> Result<(), observable_property::PropertyError> {
400/// let property = Arc::new(ObservableProperty::new(0));
401/// let property_clone = property.clone();
402///
403/// // Create subscription in main thread
404/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
405///     println!("Observed: {} -> {}", old, new);
406/// }))?;
407///
408/// // Move subscription to another thread for cleanup
409/// let handle = thread::spawn(move || {
410///     // Subscription is still active here
411///     let _ = property_clone.set(42); // Will trigger observer
412///     
413///     // When subscription is dropped here (end of thread), observer is cleaned up
414///     drop(subscription);
415/// });
416///
417/// handle.join().unwrap();
418/// 
419/// // Observer is no longer active
420/// property.set(100)?; // No output
421/// # Ok(())
422/// # }
423/// ```
424///
425/// ## Conditional Scoped Subscriptions
426///
427/// ```rust
428/// use observable_property::ObservableProperty;
429/// use std::sync::Arc;
430///
431/// # fn main() -> Result<(), observable_property::PropertyError> {
432/// let counter = ObservableProperty::new(0);
433/// let debug_mode = true;
434///
435/// if debug_mode {
436///     let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
437///         println!("Debug: counter {} -> {}", old, new);
438///     }))?;
439///     
440///     counter.set(1)?; // Prints debug info
441///     counter.set(2)?; // Prints debug info
442///     
443///     // Debug subscription automatically cleaned up when exiting if block
444/// }
445///
446/// counter.set(3)?; // No debug output (subscription was cleaned up)
447/// # Ok(())
448/// # }
449/// ```
450///
451/// # Thread Safety
452///
453/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
454/// sent between threads and the automatic cleanup will work correctly even if the
455/// subscription is dropped from a different thread than where it was created.
456pub struct Subscription<T: Clone + Send + Sync + 'static> {
457    inner: Arc<RwLock<InnerProperty<T>>>,
458    id: ObserverId,
459}
460
461impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
462    /// Debug implementation that shows the subscription ID without exposing internals
463    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464        f.debug_struct("Subscription")
465            .field("id", &self.id)
466            .field("inner", &"[ObservableProperty]")
467            .finish()
468    }
469}
470
471impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
472    /// Automatically removes the associated observer when the subscription is dropped
473    ///
474    /// This implementation provides automatic cleanup by removing the observer
475    /// from the property's observer list when the `Subscription` goes out of scope.
476    ///
477    /// # Error Handling
478    ///
479    /// If the property's lock is poisoned or inaccessible during cleanup, the error
480    /// is silently ignored using the `let _ = ...` pattern. This is intentional
481    /// because:
482    /// 1. Drop implementations should not panic
483    /// 2. If the property is poisoned, it's likely unusable anyway
484    /// 3. There's no meaningful way to handle cleanup errors in a destructor
485    ///
486    /// # Thread Safety
487    ///
488    /// This method is safe to call from any thread, even if the subscription
489    /// was created on a different thread.
490    fn drop(&mut self) {
491        let _ = self.inner.write().map(|mut prop| {
492            prop.observers.remove(&self.id);
493        });
494    }
495}
496
497/// A thread-safe observable property that notifies observers when its value changes
498///
499/// This type wraps a value of type `T` and allows multiple observers to be notified
500/// whenever the value is modified. All operations are thread-safe and can be called
501/// from multiple threads concurrently.
502///
503/// # Type Requirements
504///
505/// The generic type `T` must implement:
506/// - `Clone`: Required for returning values and passing them to observers
507/// - `Send`: Required for transferring between threads
508/// - `Sync`: Required for concurrent access from multiple threads  
509/// - `'static`: Required for observer callbacks that may outlive the original scope
510///
511/// # Examples
512///
513/// ```rust
514/// use observable_property::ObservableProperty;
515/// use std::sync::Arc;
516///
517/// let property = ObservableProperty::new("initial".to_string());
518///
519/// let observer_id = property.subscribe(Arc::new(|old, new| {
520///     println!("Changed from '{}' to '{}'", old, new);
521/// })).map_err(|e| {
522///     eprintln!("Failed to subscribe: {}", e);
523///     e
524/// })?;
525///
526/// property.set("updated".to_string()).map_err(|e| {
527///     eprintln!("Failed to set value: {}", e);
528///     e
529/// })?; // Prints: Changed from 'initial' to 'updated'
530///
531/// property.unsubscribe(observer_id).map_err(|e| {
532///     eprintln!("Failed to unsubscribe: {}", e);
533///     e
534/// })?;
535/// # Ok::<(), observable_property::PropertyError>(())
536/// ```
537pub struct ObservableProperty<T> {
538    inner: Arc<RwLock<InnerProperty<T>>>,
539    max_threads: usize,
540}
541
542struct InnerProperty<T> {
543    value: T,
544    observers: HashMap<ObserverId, Observer<T>>,
545    next_id: ObserverId,
546}
547
548impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
549    /// Creates a new observable property with the given initial value
550    ///
551    /// # Arguments
552    ///
553    /// * `initial_value` - The starting value for this property
554    ///
555    /// # Examples
556    ///
557    /// ```rust
558    /// use observable_property::ObservableProperty;
559    ///
560    /// let property = ObservableProperty::new(42);
561    /// match property.get() {
562    ///     Ok(value) => assert_eq!(value, 42),
563    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
564    /// }
565    /// ```
566    pub fn new(initial_value: T) -> Self {
567        Self {
568            inner: Arc::new(RwLock::new(InnerProperty {
569                value: initial_value,
570                observers: HashMap::new(),
571                next_id: 0,
572            })),
573            max_threads: MAX_THREADS,
574        }
575    }
576
577    pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
578        let max_threads = if max_threads == 0 {
579            MAX_THREADS
580        } else {
581            max_threads
582        };
583        Self {
584            inner: Arc::new(RwLock::new(InnerProperty {
585                value: initial_value,
586                observers: HashMap::new(),
587                next_id: 0,
588            })),
589            max_threads,
590        }
591    }
592
593    /// Gets the current value of the property
594    ///
595    /// This method acquires a read lock, which allows multiple concurrent readers
596    /// but will block if a writer currently holds the lock.
597    ///
598    /// # Returns
599    ///
600    /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
601    /// if the lock is poisoned.
602    ///
603    /// # Examples
604    ///
605    /// ```rust
606    /// use observable_property::ObservableProperty;
607    ///
608    /// let property = ObservableProperty::new("hello".to_string());
609    /// match property.get() {
610    ///     Ok(value) => assert_eq!(value, "hello"),
611    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
612    /// }
613    /// ```
614    pub fn get(&self) -> Result<T, PropertyError> {
615        self.inner
616            .read()
617            .map(|prop| prop.value.clone())
618            .map_err(|_| PropertyError::PoisonedLock)
619    }
620
621    /// Sets the property to a new value and notifies all observers
622    ///
623    /// This method will:
624    /// 1. Acquire a write lock (blocking other readers/writers)
625    /// 2. Update the value and capture a snapshot of observers
626    /// 3. Release the lock
627    /// 4. Notify all observers sequentially with the old and new values
628    ///
629    /// Observer notifications are wrapped in panic recovery to prevent one
630    /// misbehaving observer from affecting others.
631    ///
632    /// # Arguments
633    ///
634    /// * `new_value` - The new value to set
635    ///
636    /// # Returns
637    ///
638    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
639    ///
640    /// # Examples
641    ///
642    /// ```rust
643    /// use observable_property::ObservableProperty;
644    /// use std::sync::Arc;
645    ///
646    /// let property = ObservableProperty::new(10);
647    ///
648    /// property.subscribe(Arc::new(|old, new| {
649    ///     println!("Value changed from {} to {}", old, new);
650    /// })).map_err(|e| {
651    ///     eprintln!("Failed to subscribe: {}", e);
652    ///     e
653    /// })?;
654    ///
655    /// property.set(20).map_err(|e| {
656    ///     eprintln!("Failed to set property value: {}", e);
657    ///     e
658    /// })?; // Triggers observer notification
659    /// # Ok::<(), observable_property::PropertyError>(())
660    /// ```
661    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
662        let (old_value, observers_snapshot) = {
663            let mut prop = self
664                .inner
665                .write()
666                .map_err(|_| PropertyError::WriteLockError {
667                    context: "setting property value".to_string(),
668                })?;
669
670            let old_value = prop.value.clone();
671            prop.value = new_value.clone();
672            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
673            (old_value, observers_snapshot)
674        };
675
676        for observer in observers_snapshot {
677            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
678                observer(&old_value, &new_value);
679            })) {
680                eprintln!("Observer panic: {:?}", e);
681            }
682        }
683
684        Ok(())
685    }
686
687    /// Sets the property to a new value and notifies observers asynchronously
688    ///
689    /// This method is similar to `set()` but spawns observers in background threads
690    /// for non-blocking operation. This is useful when observers might perform
691    /// time-consuming operations.
692    ///
693    /// Observers are batched into groups and each batch runs in its own thread
694    /// to limit resource usage while still providing parallelism.
695    ///
696    /// # Arguments
697    ///
698    /// * `new_value` - The new value to set
699    ///
700    /// # Returns
701    ///
702    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
703    /// Note that this only indicates the property was updated successfully;
704    /// observer execution happens asynchronously.
705    ///
706    /// # Examples
707    ///
708    /// ```rust
709    /// use observable_property::ObservableProperty;
710    /// use std::sync::Arc;
711    /// use std::time::Duration;
712    ///
713    /// let property = ObservableProperty::new(0);
714    ///
715    /// property.subscribe(Arc::new(|old, new| {
716    ///     // This observer does slow work but won't block the caller
717    ///     std::thread::sleep(Duration::from_millis(100));
718    ///     println!("Slow observer: {} -> {}", old, new);
719    /// })).map_err(|e| {
720    ///     eprintln!("Failed to subscribe: {}", e);
721    ///     e
722    /// })?;
723    ///
724    /// // This returns immediately even though observer is slow
725    /// property.set_async(42).map_err(|e| {
726    ///     eprintln!("Failed to set value asynchronously: {}", e);
727    ///     e
728    /// })?;
729    /// # Ok::<(), observable_property::PropertyError>(())
730    /// ```
731    pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
732        let (old_value, observers_snapshot) = {
733            let mut prop = self
734                .inner
735                .write()
736                .map_err(|_| PropertyError::WriteLockError {
737                    context: "setting property value".to_string(),
738                })?;
739
740            let old_value = prop.value.clone();
741            prop.value = new_value.clone();
742            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
743            (old_value, observers_snapshot)
744        };
745
746        if observers_snapshot.is_empty() {
747            return Ok(());
748        }
749
750        let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
751
752        for batch in observers_snapshot.chunks(observers_per_thread) {
753            let batch_observers = batch.to_vec();
754            let old_val = old_value.clone();
755            let new_val = new_value.clone();
756
757            thread::spawn(move || {
758                for observer in batch_observers {
759                    if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
760                        observer(&old_val, &new_val);
761                    })) {
762                        eprintln!("Observer panic in batch thread: {:?}", e);
763                    }
764                }
765            });
766        }
767
768        Ok(())
769    }
770
771    /// Subscribes an observer function to be called when the property changes
772    ///
773    /// The observer function will be called with the old and new values whenever
774    /// the property is modified via `set()` or `set_async()`.
775    ///
776    /// # Arguments
777    ///
778    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
779    ///
780    /// # Returns
781    ///
782    /// `Ok(ObserverId)` containing a unique identifier for this observer,
783    /// or `Err(PropertyError)` if the lock is poisoned.
784    ///
785    /// # Examples
786    ///
787    /// ```rust
788    /// use observable_property::ObservableProperty;
789    /// use std::sync::Arc;
790    ///
791    /// let property = ObservableProperty::new(0);
792    ///
793    /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
794    ///     println!("Property changed from {} to {}", old_value, new_value);
795    /// })).map_err(|e| {
796    ///     eprintln!("Failed to subscribe observer: {}", e);
797    ///     e
798    /// })?;
799    ///
800    /// // Later, unsubscribe using the returned ID
801    /// property.unsubscribe(observer_id).map_err(|e| {
802    ///     eprintln!("Failed to unsubscribe observer: {}", e);
803    ///     e
804    /// })?;
805    /// # Ok::<(), observable_property::PropertyError>(())
806    /// ```
807    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
808        let mut prop = self
809            .inner
810            .write()
811            .map_err(|_| PropertyError::WriteLockError {
812                context: "subscribing observer".to_string(),
813            })?;
814
815        let id = prop.next_id;
816        prop.next_id += 1;
817        prop.observers.insert(id, observer);
818        Ok(id)
819    }
820
821    /// Removes an observer identified by its ID
822    ///
823    /// # Arguments
824    ///
825    /// * `id` - The observer ID returned by `subscribe()`
826    ///
827    /// # Returns
828    ///
829    /// `Ok(bool)` where `true` means the observer was found and removed,
830    /// `false` means no observer with that ID existed.
831    /// Returns `Err(PropertyError)` if the lock is poisoned.
832    ///
833    /// # Examples
834    ///
835    /// ```rust
836    /// use observable_property::ObservableProperty;
837    /// use std::sync::Arc;
838    ///
839    /// let property = ObservableProperty::new(0);
840    /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
841    ///     eprintln!("Failed to subscribe: {}", e);
842    ///     e
843    /// })?;
844    ///
845    /// let was_removed = property.unsubscribe(id).map_err(|e| {
846    ///     eprintln!("Failed to unsubscribe: {}", e);
847    ///     e
848    /// })?;
849    /// assert!(was_removed); // Observer existed and was removed
850    ///
851    /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
852    ///     eprintln!("Failed to unsubscribe again: {}", e);
853    ///     e
854    /// })?;
855    /// assert!(!was_removed_again); // Observer no longer exists
856    /// # Ok::<(), observable_property::PropertyError>(())
857    /// ```
858    pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
859        let mut prop = self
860            .inner
861            .write()
862            .map_err(|_| PropertyError::WriteLockError {
863                context: "unsubscribing observer".to_string(),
864            })?;
865
866        let was_present = prop.observers.remove(&id).is_some();
867        Ok(was_present)
868    }
869
870    /// Subscribes an observer that only gets called when a filter condition is met
871    ///
872    /// This is useful for observing only specific types of changes, such as
873    /// when a value increases or crosses a threshold.
874    ///
875    /// # Arguments
876    ///
877    /// * `observer` - The observer function to call when the filter passes
878    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
879    ///
880    /// # Returns
881    ///
882    /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
883    ///
884    /// # Examples
885    ///
886    /// ```rust
887    /// use observable_property::ObservableProperty;
888    /// use std::sync::Arc;
889    ///
890    /// let property = ObservableProperty::new(0);
891    ///
892    /// // Only notify when value increases
893    /// let id = property.subscribe_filtered(
894    ///     Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
895    ///     |old, new| new > old
896    /// ).map_err(|e| {
897    ///     eprintln!("Failed to subscribe filtered observer: {}", e);
898    ///     e
899    /// })?;
900    ///
901    /// property.set(10).map_err(|e| {
902    ///     eprintln!("Failed to set value: {}", e);
903    ///     e
904    /// })?; // Triggers observer (0 -> 10)
905    /// property.set(5).map_err(|e| {
906    ///     eprintln!("Failed to set value: {}", e);
907    ///     e
908    /// })?;  // Does NOT trigger observer (10 -> 5)
909    /// property.set(15).map_err(|e| {
910    ///     eprintln!("Failed to set value: {}", e);
911    ///     e
912    /// })?; // Triggers observer (5 -> 15)
913    /// # Ok::<(), observable_property::PropertyError>(())
914    /// ```
915    pub fn subscribe_filtered<F>(
916        &self,
917        observer: Observer<T>,
918        filter: F,
919    ) -> Result<ObserverId, PropertyError>
920    where
921        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
922    {
923        let filter = Arc::new(filter);
924        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
925            if filter(old_val, new_val) {
926                observer(old_val, new_val);
927            }
928        });
929
930        self.subscribe(filtered_observer)
931    }
932
933    pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
934        let prop = self
935            .inner
936            .read()
937            .map_err(|_| PropertyError::ReadLockError {
938                context: "notifying observers".to_string(),
939            })?;
940
941        for (old_val, new_val) in changes {
942            for observer in prop.observers.values() {
943                observer(&old_val, &new_val);
944            }
945        }
946        Ok(())
947    }
948
949    /// Subscribes an observer and returns a RAII guard for automatic cleanup
950    ///
951    /// This method is similar to `subscribe()` but returns a `Subscription` object
952    /// that automatically removes the observer when it goes out of scope. This
953    /// provides a more convenient and safer alternative to manual subscription
954    /// management.
955    ///
956    /// # Arguments
957    ///
958    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
959    ///
960    /// # Returns
961    ///
962    /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
963    /// or `Err(PropertyError)` if the lock is poisoned.
964    ///
965    /// # Examples
966    ///
967    /// ## Basic RAII Subscription
968    ///
969    /// ```rust
970    /// use observable_property::ObservableProperty;
971    /// use std::sync::Arc;
972    ///
973    /// # fn main() -> Result<(), observable_property::PropertyError> {
974    /// let property = ObservableProperty::new(0);
975    ///
976    /// {
977    ///     let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
978    ///         println!("Value: {} -> {}", old, new);
979    ///     }))?;
980    ///
981    ///     property.set(42)?; // Prints: "Value: 0 -> 42"
982    ///     property.set(100)?; // Prints: "Value: 42 -> 100"
983    ///
984    ///     // Automatic cleanup when _subscription goes out of scope
985    /// }
986    ///
987    /// property.set(200)?; // No output - subscription was cleaned up
988    /// # Ok(())
989    /// # }
990    /// ```
991    ///
992    /// ## Comparison with Manual Management
993    ///
994    /// ```rust
995    /// use observable_property::ObservableProperty;
996    /// use std::sync::Arc;
997    ///
998    /// # fn main() -> Result<(), observable_property::PropertyError> {
999    /// let property = ObservableProperty::new("initial".to_string());
1000    ///
1001    /// // Method 1: Manual subscription management (traditional approach)
1002    /// let observer_id = property.subscribe(Arc::new(|old, new| {
1003    ///     println!("Manual: {} -> {}", old, new);
1004    /// }))?;
1005    ///
1006    /// // Method 2: RAII subscription management (recommended)
1007    /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1008    ///     println!("RAII: {} -> {}", old, new);
1009    /// }))?;
1010    ///
1011    /// // Both observers will be called
1012    /// property.set("changed".to_string())?;
1013    /// // Prints:
1014    /// // "Manual: initial -> changed"
1015    /// // "RAII: initial -> changed"
1016    ///
1017    /// // Manual cleanup required for first observer
1018    /// property.unsubscribe(observer_id)?;
1019    ///
1020    /// // Second observer (_subscription) is automatically cleaned up when
1021    /// // the variable goes out of scope - no manual intervention needed
1022    /// # Ok(())
1023    /// # }
1024    /// ```
1025    ///
1026    /// ## Error Handling with Early Returns
1027    ///
1028    /// ```rust
1029    /// use observable_property::ObservableProperty;
1030    /// use std::sync::Arc;
1031    ///
1032    /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1033    ///     let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1034    ///         println!("Processing: {} -> {}", old, new);
1035    ///     }))?;
1036    ///
1037    ///     property.set(1)?;
1038    ///     
1039    ///     if property.get()? > 0 {
1040    ///         return Ok(()); // Subscription automatically cleaned up on early return
1041    ///     }
1042    ///
1043    ///     property.set(2)?;
1044    ///     Ok(()) // Subscription automatically cleaned up on normal return
1045    /// }
1046    ///
1047    /// # fn main() -> Result<(), observable_property::PropertyError> {
1048    /// let property = ObservableProperty::new(0);
1049    /// process_with_monitoring(&property)?; // Monitoring active only during function call
1050    /// property.set(99)?; // No monitoring output - subscription was cleaned up
1051    /// # Ok(())
1052    /// # }
1053    /// ```
1054    ///
1055    /// ## Multi-threaded Subscription Management
1056    ///
1057    /// ```rust
1058    /// use observable_property::ObservableProperty;
1059    /// use std::sync::Arc;
1060    /// use std::thread;
1061    ///
1062    /// # fn main() -> Result<(), observable_property::PropertyError> {
1063    /// let property = Arc::new(ObservableProperty::new(0));
1064    /// let property_clone = property.clone();
1065    ///
1066    /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1067    ///     let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1068    ///         println!("Thread observer: {} -> {}", old, new);
1069    ///     }))?;
1070    ///
1071    ///     property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1072    ///     
1073    ///     // Subscription automatically cleaned up when thread ends
1074    ///     Ok(())
1075    /// });
1076    ///
1077    /// handle.join().unwrap()?;
1078    /// property.set(100)?; // No output - thread subscription was cleaned up
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    ///
1083    /// # Use Cases
1084    ///
1085    /// This method is particularly useful in scenarios such as:
1086    /// - Temporary observers that should be active only during a specific scope
1087    /// - Error-prone code where manual cleanup might be forgotten
1088    /// - Complex control flow where multiple exit points make manual cleanup difficult
1089    /// - Resource-constrained environments where observer leaks are problematic
1090    pub fn subscribe_with_subscription(
1091        &self,
1092        observer: Observer<T>,
1093    ) -> Result<Subscription<T>, PropertyError> {
1094        let id = self.subscribe(observer)?;
1095        Ok(Subscription {
1096            inner: Arc::clone(&self.inner),
1097            id,
1098        })
1099    }
1100
1101    /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1102    ///
1103    /// This method combines the functionality of `subscribe_filtered()` with the automatic
1104    /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1105    /// called when the filter condition is satisfied, and it will be automatically
1106    /// unsubscribed when the returned `Subscription` goes out of scope.
1107    ///
1108    /// # Arguments
1109    ///
1110    /// * `observer` - The observer function to call when the filter passes
1111    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1112    ///
1113    /// # Returns
1114    ///
1115    /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1116    /// or `Err(PropertyError)` if the lock is poisoned.
1117    ///
1118    /// # Examples
1119    ///
1120    /// ## Basic Filtered RAII Subscription
1121    ///
1122    /// ```rust
1123    /// use observable_property::ObservableProperty;
1124    /// use std::sync::Arc;
1125    ///
1126    /// # fn main() -> Result<(), observable_property::PropertyError> {
1127    /// let counter = ObservableProperty::new(0);
1128    ///
1129    /// {
1130    ///     // Monitor only increases with automatic cleanup
1131    ///     let _increase_monitor = counter.subscribe_filtered_with_subscription(
1132    ///         Arc::new(|old, new| {
1133    ///             println!("Counter increased: {} -> {}", old, new);
1134    ///         }),
1135    ///         |old, new| new > old
1136    ///     )?;
1137    ///
1138    ///     counter.set(5)?;  // Prints: "Counter increased: 0 -> 5"
1139    ///     counter.set(3)?;  // No output (decrease)
1140    ///     counter.set(7)?;  // Prints: "Counter increased: 3 -> 7"
1141    ///
1142    ///     // Subscription automatically cleaned up when leaving scope
1143    /// }
1144    ///
1145    /// counter.set(10)?; // No output - subscription was cleaned up
1146    /// # Ok(())
1147    /// # }
1148    /// ```
1149    ///
1150    /// ## Multi-Condition Temperature Monitoring
1151    ///
1152    /// ```rust
1153    /// use observable_property::ObservableProperty;
1154    /// use std::sync::Arc;
1155    ///
1156    /// # fn main() -> Result<(), observable_property::PropertyError> {
1157    /// let temperature = ObservableProperty::new(20.0_f64);
1158    ///
1159    /// {
1160    ///     // Create filtered subscription that only triggers for significant temperature increases
1161    ///     let _heat_warning = temperature.subscribe_filtered_with_subscription(
1162    ///         Arc::new(|old_temp, new_temp| {
1163    ///             println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1164    ///                      old_temp, new_temp);
1165    ///         }),
1166    ///         |old, new| new > old && (new - old) > 5.0  // Only trigger for increases > 5°C
1167    ///     )?;
1168    ///
1169    ///     // Create another filtered subscription for cooling alerts
1170    ///     let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1171    ///         Arc::new(|old_temp, new_temp| {
1172    ///             println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1173    ///                      old_temp, new_temp);
1174    ///         }),
1175    ///         |old, new| new < old && (old - new) > 3.0  // Only trigger for decreases > 3°C
1176    ///     )?;
1177    ///
1178    ///     // Test the filters
1179    ///     temperature.set(22.0)?; // No alerts (increase of only 2°C)
1180    ///     temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1181    ///     temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1182    ///
1183    ///     // Both subscriptions are automatically cleaned up when they go out of scope
1184    /// }
1185    ///
1186    /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1187    /// # Ok(())
1188    /// # }
1189    /// ```
1190    ///
1191    /// ## Conditional Monitoring with Complex Filters
1192    ///
1193    /// ```rust
1194    /// use observable_property::ObservableProperty;
1195    /// use std::sync::Arc;
1196    ///
1197    /// # fn main() -> Result<(), observable_property::PropertyError> {
1198    /// let stock_price = ObservableProperty::new(100.0_f64);
1199    ///
1200    /// {
1201    ///     // Monitor significant price movements (> 5% change)
1202    ///     let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1203    ///         Arc::new(|old_price, new_price| {
1204    ///             let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1205    ///             println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1206    ///                     old_price, new_price, change_percent);
1207    ///         }),
1208    ///         |old, new| {
1209    ///             let change_percent = ((new - old) / old * 100.0).abs();
1210    ///             change_percent > 5.0  // Trigger on > 5% change
1211    ///         }
1212    ///     )?;
1213    ///
1214    ///     stock_price.set(103.0)?; // No alert (3% change)
1215    ///     stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1216    ///     stock_price.set(95.0)?;  // Alert triggered (12% decrease)
1217    ///
1218    ///     // Subscription automatically cleaned up when leaving scope
1219    /// }
1220    ///
1221    /// stock_price.set(200.0)?; // No alert - monitoring ended
1222    /// # Ok(())
1223    /// # }
1224    /// ```
1225    ///
1226    /// ## Cross-Thread Filtered Monitoring
1227    ///
1228    /// ```rust
1229    /// use observable_property::ObservableProperty;
1230    /// use std::sync::Arc;
1231    /// use std::thread;
1232    /// use std::time::Duration;
1233    ///
1234    /// # fn main() -> Result<(), observable_property::PropertyError> {
1235    /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1236    /// let latency_clone = network_latency.clone();
1237    ///
1238    /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1239    ///     // Monitor high latency in background thread with automatic cleanup
1240    ///     let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1241    ///         Arc::new(|old_ms, new_ms| {
1242    ///             println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1243    ///         }),
1244    ///         |_, new| *new > 100  // Alert when latency exceeds 100ms
1245    ///     )?;
1246    ///
1247    ///     // Simulate monitoring for a short time
1248    ///     thread::sleep(Duration::from_millis(10));
1249    ///     
1250    ///     // Subscription automatically cleaned up when thread ends
1251    ///     Ok(())
1252    /// });
1253    ///
1254    /// // Simulate network conditions
1255    /// network_latency.set(80)?;  // No alert (under threshold)
1256    /// network_latency.set(150)?; // Alert triggered in background thread
1257    ///
1258    /// monitor_handle.join().unwrap()?;
1259    /// network_latency.set(200)?; // No alert - background monitoring ended
1260    /// # Ok(())
1261    /// # }
1262    /// ```
1263    ///
1264    /// # Use Cases
1265    ///
1266    /// This method is ideal for:
1267    /// - Threshold-based monitoring with automatic cleanup
1268    /// - Temporary conditional observers in specific code blocks
1269    /// - Event-driven systems where observers should be active only during certain phases
1270    /// - Resource management scenarios where filtered observers have limited lifetimes
1271    ///
1272    /// # Performance Notes
1273    ///
1274    /// The filter function is evaluated for every property change, so it should be
1275    /// lightweight. Complex filtering logic should be optimized to avoid performance
1276    /// bottlenecks, especially in high-frequency update scenarios.
1277    pub fn subscribe_filtered_with_subscription<F>(
1278        &self,
1279        observer: Observer<T>,
1280        filter: F,
1281    ) -> Result<Subscription<T>, PropertyError>
1282    where
1283        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1284    {
1285        let id = self.subscribe_filtered(observer, filter)?;
1286        Ok(Subscription {
1287            inner: Arc::clone(&self.inner),
1288            id,
1289        })
1290    }
1291}
1292
1293impl<T: Clone> Clone for ObservableProperty<T> {
1294    /// Creates a new reference to the same observable property
1295    ///
1296    /// This creates a new `ObservableProperty` instance that shares the same
1297    /// underlying data with the original. Changes made through either instance
1298    /// will be visible to observers subscribed through both instances.
1299    ///
1300    /// # Examples
1301    ///
1302    /// ```rust
1303    /// use observable_property::ObservableProperty;
1304    /// use std::sync::Arc;
1305    ///
1306    /// let property1 = ObservableProperty::new(42);
1307    /// let property2 = property1.clone();
1308    ///
1309    /// property2.subscribe(Arc::new(|old, new| {
1310    ///     println!("Observer on property2 saw change: {} -> {}", old, new);
1311    /// })).map_err(|e| {
1312    ///     eprintln!("Failed to subscribe: {}", e);
1313    ///     e
1314    /// })?;
1315    ///
1316    /// // This change through property1 will trigger the observer on property2
1317    /// property1.set(100).map_err(|e| {
1318    ///     eprintln!("Failed to set value: {}", e);
1319    ///     e
1320    /// })?;
1321    /// # Ok::<(), observable_property::PropertyError>(())
1322    /// ```
1323    fn clone(&self) -> Self {
1324        Self {
1325            inner: Arc::clone(&self.inner),
1326            max_threads: self.max_threads,
1327        }
1328    }
1329}
1330
1331impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1332    /// Debug implementation that shows the current value if accessible
1333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334        match self.get() {
1335            Ok(value) => f
1336                .debug_struct("ObservableProperty")
1337                .field("value", &value)
1338                .field("observers_count", &"[hidden]")
1339                .field("max_threads", &self.max_threads)
1340                .finish(),
1341            Err(_) => f
1342                .debug_struct("ObservableProperty")
1343                .field("value", &"[inaccessible]")
1344                .field("observers_count", &"[hidden]")
1345                .finish(),
1346        }
1347    }
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352    use super::*;
1353    use std::sync::atomic::{AtomicUsize, Ordering};
1354    use std::time::Duration;
1355
1356    #[test]
1357    fn test_property_creation_and_basic_operations() {
1358        let prop = ObservableProperty::new(42);
1359
1360        // Test initial value
1361        match prop.get() {
1362            Ok(value) => assert_eq!(value, 42),
1363            Err(e) => panic!("Failed to get initial value: {}", e),
1364        }
1365
1366        // Test setting value
1367        if let Err(e) = prop.set(100) {
1368            panic!("Failed to set value: {}", e);
1369        }
1370
1371        match prop.get() {
1372            Ok(value) => assert_eq!(value, 100),
1373            Err(e) => panic!("Failed to get updated value: {}", e),
1374        }
1375    }
1376
1377    #[test]
1378    fn test_observer_subscription_and_notification() {
1379        let prop = ObservableProperty::new("initial".to_string());
1380        let notification_count = Arc::new(AtomicUsize::new(0));
1381        let last_old_value = Arc::new(RwLock::new(String::new()));
1382        let last_new_value = Arc::new(RwLock::new(String::new()));
1383
1384        let count_clone = notification_count.clone();
1385        let old_clone = last_old_value.clone();
1386        let new_clone = last_new_value.clone();
1387
1388        let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1389            count_clone.fetch_add(1, Ordering::SeqCst);
1390            if let Ok(mut old_val) = old_clone.write() {
1391                *old_val = old.clone();
1392            }
1393            if let Ok(mut new_val) = new_clone.write() {
1394                *new_val = new.clone();
1395            }
1396        })) {
1397            Ok(id) => id,
1398            Err(e) => panic!("Failed to subscribe observer: {}", e),
1399        };
1400
1401        // Change value and verify notification
1402        if let Err(e) = prop.set("changed".to_string()) {
1403            panic!("Failed to set property value: {}", e);
1404        }
1405
1406        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1407
1408        match last_old_value.read() {
1409            Ok(old_val) => assert_eq!(*old_val, "initial"),
1410            Err(e) => panic!("Failed to read old value: {:?}", e),
1411        }
1412
1413        match last_new_value.read() {
1414            Ok(new_val) => assert_eq!(*new_val, "changed"),
1415            Err(e) => panic!("Failed to read new value: {:?}", e),
1416        }
1417
1418        // Test unsubscription
1419        match prop.unsubscribe(observer_id) {
1420            Ok(was_present) => assert!(was_present),
1421            Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1422        }
1423
1424        // Change value again - should not notify
1425        if let Err(e) = prop.set("not_notified".to_string()) {
1426            panic!("Failed to set property value after unsubscribe: {}", e);
1427        }
1428        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1429    }
1430
1431    #[test]
1432    fn test_filtered_observer() {
1433        let prop = ObservableProperty::new(0i32);
1434        let notification_count = Arc::new(AtomicUsize::new(0));
1435        let count_clone = notification_count.clone();
1436
1437        // Observer only triggered when value increases
1438        let observer_id = match prop.subscribe_filtered(
1439            Arc::new(move |_, _| {
1440                count_clone.fetch_add(1, Ordering::SeqCst);
1441            }),
1442            |old, new| new > old,
1443        ) {
1444            Ok(id) => id,
1445            Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1446        };
1447
1448        // Should trigger (0 -> 5)
1449        if let Err(e) = prop.set(5) {
1450            panic!("Failed to set property value to 5: {}", e);
1451        }
1452        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1453
1454        // Should NOT trigger (5 -> 3)
1455        if let Err(e) = prop.set(3) {
1456            panic!("Failed to set property value to 3: {}", e);
1457        }
1458        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1459
1460        // Should trigger (3 -> 10)
1461        if let Err(e) = prop.set(10) {
1462            panic!("Failed to set property value to 10: {}", e);
1463        }
1464        assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1465
1466        match prop.unsubscribe(observer_id) {
1467            Ok(_) => {}
1468            Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
1469        }
1470    }
1471
1472    #[test]
1473    fn test_thread_safety_concurrent_reads() {
1474        let prop = Arc::new(ObservableProperty::new(42i32));
1475        let num_threads = 10;
1476        let reads_per_thread = 100;
1477
1478        let handles: Vec<_> = (0..num_threads)
1479            .map(|_| {
1480                let prop_clone = prop.clone();
1481                thread::spawn(move || {
1482                    for _ in 0..reads_per_thread {
1483                        match prop_clone.get() {
1484                            Ok(value) => assert_eq!(value, 42),
1485                            Err(e) => panic!("Failed to read property value: {}", e),
1486                        }
1487                        thread::sleep(Duration::from_millis(1));
1488                    }
1489                })
1490            })
1491            .collect();
1492
1493        for handle in handles {
1494            if let Err(e) = handle.join() {
1495                panic!("Thread failed to complete: {:?}", e);
1496            }
1497        }
1498    }
1499
1500    #[test]
1501    fn test_async_set_performance() {
1502        let prop = ObservableProperty::new(0i32);
1503        let slow_observer_count = Arc::new(AtomicUsize::new(0));
1504        let count_clone = slow_observer_count.clone();
1505
1506        // Add observer that simulates slow work
1507        let _id = match prop.subscribe(Arc::new(move |_, _| {
1508            thread::sleep(Duration::from_millis(50));
1509            count_clone.fetch_add(1, Ordering::SeqCst);
1510        })) {
1511            Ok(id) => id,
1512            Err(e) => panic!("Failed to subscribe slow observer: {}", e),
1513        };
1514
1515        // Test synchronous set (should be slow)
1516        let start = std::time::Instant::now();
1517        if let Err(e) = prop.set(1) {
1518            panic!("Failed to set property value synchronously: {}", e);
1519        }
1520        let sync_duration = start.elapsed();
1521
1522        // Test asynchronous set (should be fast)
1523        let start = std::time::Instant::now();
1524        if let Err(e) = prop.set_async(2) {
1525            panic!("Failed to set property value asynchronously: {}", e);
1526        }
1527        let async_duration = start.elapsed();
1528
1529        // Async should be much faster than sync
1530        assert!(async_duration < sync_duration);
1531        assert!(async_duration.as_millis() < 10); // Should be very fast
1532
1533        // Wait for async observer to complete
1534        thread::sleep(Duration::from_millis(100));
1535
1536        // Both observers should have been called
1537        assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
1538    }
1539
1540    #[test]
1541    fn test_lock_poisoning() {
1542        // Create a property that we'll poison
1543        let prop = Arc::new(ObservableProperty::new(0));
1544        let prop_clone = prop.clone();
1545
1546        // Create a thread that will deliberately poison the lock
1547        let poison_thread = thread::spawn(move || {
1548            // Get write lock and then panic, which will poison the lock
1549            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1550            panic!("Deliberate panic to poison the lock");
1551        });
1552
1553        // Wait for the thread to complete (it will panic)
1554        let _ = poison_thread.join();
1555
1556        // Now the lock should be poisoned, verify all operations return appropriate errors
1557        match prop.get() {
1558            Ok(_) => panic!("get() should fail on a poisoned lock"),
1559            Err(e) => match e {
1560                PropertyError::PoisonedLock => {} // Expected error
1561                _ => panic!("Expected PoisonedLock error, got: {:?}", e),
1562            },
1563        }
1564
1565        match prop.set(42) {
1566            Ok(_) => panic!("set() should fail on a poisoned lock"),
1567            Err(e) => match e {
1568                PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1569                _ => panic!("Expected lock-related error, got: {:?}", e),
1570            },
1571        }
1572
1573        match prop.subscribe(Arc::new(|_, _| {})) {
1574            Ok(_) => panic!("subscribe() should fail on a poisoned lock"),
1575            Err(e) => match e {
1576                PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1577                _ => panic!("Expected lock-related error, got: {:?}", e),
1578            },
1579        }
1580    }
1581
1582    #[test]
1583    fn test_observer_panic_isolation() {
1584        let prop = ObservableProperty::new(0);
1585        let call_counts = Arc::new(AtomicUsize::new(0));
1586
1587        // First observer will panic
1588        let panic_observer_id = prop
1589            .subscribe(Arc::new(|_, _| {
1590                panic!("This observer deliberately panics");
1591            }))
1592            .expect("Failed to subscribe panic observer");
1593
1594        // Second observer should still be called despite first one panicking
1595        let counts = call_counts.clone();
1596        let normal_observer_id = prop
1597            .subscribe(Arc::new(move |_, _| {
1598                counts.fetch_add(1, Ordering::SeqCst);
1599            }))
1600            .expect("Failed to subscribe normal observer");
1601
1602        // Trigger the observers - this shouldn't panic despite the first observer panicking
1603        prop.set(42).expect("Failed to set property value");
1604
1605        // Verify the second observer was still called
1606        assert_eq!(call_counts.load(Ordering::SeqCst), 1);
1607
1608        // Clean up
1609        prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
1610        prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
1611    }
1612
1613    #[test]
1614    fn test_unsubscribe_nonexistent_observer() {
1615        let property = ObservableProperty::new(0);
1616
1617        // Generate a valid observer ID
1618        let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
1619
1620        // Create an ID that doesn't exist (valid_id + 1000 should not exist)
1621        let nonexistent_id = valid_id + 1000;
1622
1623        // Test unsubscribing a nonexistent observer
1624        match property.unsubscribe(nonexistent_id) {
1625            Ok(was_present) => {
1626                assert!(
1627                    !was_present,
1628                    "Unsubscribe should return false for nonexistent ID"
1629                );
1630            }
1631            Err(e) => panic!("Unsubscribe returned error: {:?}", e),
1632        }
1633
1634        // Also verify that unsubscribing twice returns false the second time
1635        property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
1636
1637        let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
1638        assert!(!result, "Second unsubscribe should return false");
1639    }
1640
1641    #[test]
1642    fn test_observer_id_wraparound() {
1643        let prop = ObservableProperty::new(0);
1644
1645        // Test that observer IDs increment properly and don't wrap around unexpectedly
1646        let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
1647        let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
1648        let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
1649
1650        assert!(id2 > id1, "Observer IDs should increment");
1651        assert!(id3 > id2, "Observer IDs should continue incrementing");
1652        assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
1653        assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
1654
1655        // Clean up
1656        prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
1657        prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
1658        prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
1659    }
1660
1661    #[test]
1662    fn test_concurrent_subscribe_unsubscribe() {
1663        let prop = Arc::new(ObservableProperty::new(0));
1664        let num_threads = 8;
1665        let operations_per_thread = 100;
1666
1667        let handles: Vec<_> = (0..num_threads)
1668            .map(|thread_id| {
1669                let prop_clone = prop.clone();
1670                thread::spawn(move || {
1671                    let mut local_ids = Vec::new();
1672
1673                    for i in 0..operations_per_thread {
1674                        // Subscribe an observer
1675                        let observer_id = prop_clone
1676                            .subscribe(Arc::new(move |old, new| {
1677                                // Do some work to simulate real observer
1678                                let _ = thread_id + i + old + new;
1679                            }))
1680                            .expect("Subscribe should succeed");
1681
1682                        local_ids.push(observer_id);
1683
1684                        // Occasionally unsubscribe some observers
1685                        if i % 10 == 0 && !local_ids.is_empty() {
1686                            let idx = i % local_ids.len();
1687                            let id_to_remove = local_ids.remove(idx);
1688                            prop_clone
1689                                .unsubscribe(id_to_remove)
1690                                .expect("Unsubscribe should succeed");
1691                        }
1692                    }
1693
1694                    // Clean up remaining observers
1695                    for id in local_ids {
1696                        prop_clone
1697                            .unsubscribe(id)
1698                            .expect("Final cleanup should succeed");
1699                    }
1700                })
1701            })
1702            .collect();
1703
1704        // Wait for all threads to complete
1705        for handle in handles {
1706            handle.join().expect("Thread should complete successfully");
1707        }
1708
1709        // Property should still be functional
1710        prop.set(42)
1711            .expect("Property should still work after concurrent operations");
1712    }
1713
1714    #[test]
1715    fn test_multiple_observer_panics_isolation() {
1716        let prop = ObservableProperty::new(0);
1717        let successful_calls = Arc::new(AtomicUsize::new(0));
1718
1719        // Create multiple observers that will panic
1720        let _panic_id1 = prop
1721            .subscribe(Arc::new(|_, _| {
1722                panic!("First panic observer");
1723            }))
1724            .expect("Failed to subscribe first panic observer");
1725
1726        let _panic_id2 = prop
1727            .subscribe(Arc::new(|_, _| {
1728                panic!("Second panic observer");
1729            }))
1730            .expect("Failed to subscribe second panic observer");
1731
1732        // Create observers that should succeed despite the panics
1733        let count1 = successful_calls.clone();
1734        let _success_id1 = prop
1735            .subscribe(Arc::new(move |_, _| {
1736                count1.fetch_add(1, Ordering::SeqCst);
1737            }))
1738            .expect("Failed to subscribe first success observer");
1739
1740        let count2 = successful_calls.clone();
1741        let _success_id2 = prop
1742            .subscribe(Arc::new(move |_, _| {
1743                count2.fetch_add(1, Ordering::SeqCst);
1744            }))
1745            .expect("Failed to subscribe second success observer");
1746
1747        // Trigger all observers
1748        prop.set(42).expect("Failed to set property value for panic isolation test");
1749
1750        // Both successful observers should have been called despite the panics
1751        assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
1752    }
1753
1754    #[test]
1755    fn test_async_observer_panic_isolation() {
1756        let prop = ObservableProperty::new(0);
1757        let successful_calls = Arc::new(AtomicUsize::new(0));
1758
1759        // Create observer that will panic
1760        let _panic_id = prop
1761            .subscribe(Arc::new(|_, _| {
1762                panic!("Async panic observer");
1763            }))
1764            .expect("Failed to subscribe async panic observer");
1765
1766        // Create observer that should succeed
1767        let count = successful_calls.clone();
1768        let _success_id = prop
1769            .subscribe(Arc::new(move |_, _| {
1770                count.fetch_add(1, Ordering::SeqCst);
1771            }))
1772            .expect("Failed to subscribe async success observer");
1773
1774        // Use async set to trigger observers in background threads
1775        prop.set_async(42).expect("Failed to set property value asynchronously");
1776
1777        // Wait for async observers to complete
1778        thread::sleep(Duration::from_millis(100));
1779
1780        // The successful observer should have been called despite the panic
1781        assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
1782    }
1783
1784    #[test]
1785    fn test_very_large_observer_count() {
1786        let prop = ObservableProperty::new(0);
1787        let total_calls = Arc::new(AtomicUsize::new(0));
1788        let observer_count = 1000;
1789
1790        // Subscribe many observers
1791        let mut observer_ids = Vec::with_capacity(observer_count);
1792        for i in 0..observer_count {
1793            let count = total_calls.clone();
1794            let id = prop
1795                .subscribe(Arc::new(move |old, new| {
1796                    count.fetch_add(1, Ordering::SeqCst);
1797                    // Verify we got the right values
1798                    assert_eq!(*old, 0);
1799                    assert_eq!(*new, i + 1);
1800                }))
1801                .expect("Failed to subscribe large observer count test observer");
1802            observer_ids.push(id);
1803        }
1804
1805        // Trigger all observers
1806        prop.set(observer_count).expect("Failed to set property value for large observer count test");
1807
1808        // All observers should have been called
1809        assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
1810
1811        // Clean up
1812        for id in observer_ids {
1813            prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
1814        }
1815    }
1816
1817    #[test]
1818    fn test_observer_with_mutable_state() {
1819        let prop = ObservableProperty::new(0);
1820        let call_history = Arc::new(RwLock::new(Vec::new()));
1821
1822        let history = call_history.clone();
1823        let observer_id = prop
1824            .subscribe(Arc::new(move |old, new| {
1825                if let Ok(mut hist) = history.write() {
1826                    hist.push((*old, *new));
1827                }
1828            }))
1829            .expect("Failed to subscribe mutable state observer");
1830
1831        // Make several changes
1832        prop.set(1).expect("Failed to set property to 1");
1833        prop.set(2).expect("Failed to set property to 2");
1834        prop.set(3).expect("Failed to set property to 3");
1835
1836        // Verify the history was recorded correctly
1837        let history = call_history.read().expect("Failed to read call history");
1838        assert_eq!(history.len(), 3);
1839        assert_eq!(history[0], (0, 1));
1840        assert_eq!(history[1], (1, 2));
1841        assert_eq!(history[2], (2, 3));
1842
1843        prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
1844    }
1845
1846    #[test]
1847    fn test_subscription_automatic_cleanup() {
1848        let prop = ObservableProperty::new(0);
1849        let call_count = Arc::new(AtomicUsize::new(0));
1850
1851        // Test that subscription automatically cleans up when dropped
1852        {
1853            let count = call_count.clone();
1854            let _subscription = prop
1855                .subscribe_with_subscription(Arc::new(move |_, _| {
1856                    count.fetch_add(1, Ordering::SeqCst);
1857                }))
1858                .expect("Failed to create subscription for automatic cleanup test");
1859
1860            // Observer should be active while subscription is in scope
1861            prop.set(1).expect("Failed to set property value in subscription test");
1862            assert_eq!(call_count.load(Ordering::SeqCst), 1);
1863
1864            // Subscription goes out of scope here and should auto-cleanup
1865        }
1866
1867        // Observer should no longer be active after subscription dropped
1868        prop.set(2).expect("Failed to set property value after subscription dropped");
1869        assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
1870    }
1871
1872    #[test]
1873    fn test_subscription_explicit_drop() {
1874        let prop = ObservableProperty::new(0);
1875        let call_count = Arc::new(AtomicUsize::new(0));
1876
1877        let count = call_count.clone();
1878        let subscription = prop
1879            .subscribe_with_subscription(Arc::new(move |_, _| {
1880                count.fetch_add(1, Ordering::SeqCst);
1881            }))
1882            .expect("Failed to create subscription for explicit drop test");
1883
1884        // Observer should be active
1885        prop.set(1).expect("Failed to set property value before explicit drop");
1886        assert_eq!(call_count.load(Ordering::SeqCst), 1);
1887
1888        // Explicitly drop the subscription
1889        drop(subscription);
1890
1891        // Observer should no longer be active
1892        prop.set(2).expect("Failed to set property value after explicit drop");
1893        assert_eq!(call_count.load(Ordering::SeqCst), 1);
1894    }
1895
1896    #[test]
1897    fn test_multiple_subscriptions_with_cleanup() {
1898        let prop = ObservableProperty::new(0);
1899        let call_count1 = Arc::new(AtomicUsize::new(0));
1900        let call_count2 = Arc::new(AtomicUsize::new(0));
1901        let call_count3 = Arc::new(AtomicUsize::new(0));
1902
1903        let count1 = call_count1.clone();
1904        let count2 = call_count2.clone();
1905        let count3 = call_count3.clone();
1906
1907        let subscription1 = prop
1908            .subscribe_with_subscription(Arc::new(move |_, _| {
1909                count1.fetch_add(1, Ordering::SeqCst);
1910            }))
1911            .expect("Failed to create first subscription");
1912
1913        let subscription2 = prop
1914            .subscribe_with_subscription(Arc::new(move |_, _| {
1915                count2.fetch_add(1, Ordering::SeqCst);
1916            }))
1917            .expect("Failed to create second subscription");
1918
1919        let subscription3 = prop
1920            .subscribe_with_subscription(Arc::new(move |_, _| {
1921                count3.fetch_add(1, Ordering::SeqCst);
1922            }))
1923            .expect("Failed to create third subscription");
1924
1925        // All observers should be active
1926        prop.set(1).expect("Failed to set property value with all subscriptions");
1927        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
1928        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
1929        assert_eq!(call_count3.load(Ordering::SeqCst), 1);
1930
1931        // Drop second subscription
1932        drop(subscription2);
1933
1934        // Only first and third should be active
1935        prop.set(2).expect("Failed to set property value with partial subscriptions");
1936        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
1937        assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
1938        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
1939
1940        // Drop remaining subscriptions
1941        drop(subscription1);
1942        drop(subscription3);
1943
1944        // No observers should be active
1945        prop.set(3).expect("Failed to set property value with no subscriptions");
1946        assert_eq!(call_count1.load(Ordering::SeqCst), 2);
1947        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
1948        assert_eq!(call_count3.load(Ordering::SeqCst), 2);
1949    }
1950
1951    #[test]
1952    fn test_subscription_drop_with_poisoned_lock() {
1953        let prop = Arc::new(ObservableProperty::new(0));
1954        let prop_clone = prop.clone();
1955
1956        // Create a subscription
1957        let call_count = Arc::new(AtomicUsize::new(0));
1958        let count = call_count.clone();
1959        let subscription = prop
1960            .subscribe_with_subscription(Arc::new(move |_, _| {
1961                count.fetch_add(1, Ordering::SeqCst);
1962            }))
1963            .expect("Failed to create subscription for poisoned lock test");
1964
1965        // Poison the lock by panicking while holding a write lock
1966        let poison_thread = thread::spawn(move || {
1967            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1968            panic!("Deliberate panic to poison the lock");
1969        });
1970        let _ = poison_thread.join(); // Ignore the panic result
1971
1972        // Dropping the subscription should not panic even with poisoned lock
1973        // This tests that the Drop implementation handles poisoned locks gracefully
1974        drop(subscription); // Should complete without panic
1975
1976        // Test passes if we reach here without panicking
1977    }
1978
1979    #[test]
1980    fn test_subscription_vs_manual_unsubscribe() {
1981        let prop = ObservableProperty::new(0);
1982        let auto_count = Arc::new(AtomicUsize::new(0));
1983        let manual_count = Arc::new(AtomicUsize::new(0));
1984
1985        // Manual subscription
1986        let manual_count_clone = manual_count.clone();
1987        let manual_id = prop
1988            .subscribe(Arc::new(move |_, _| {
1989                manual_count_clone.fetch_add(1, Ordering::SeqCst);
1990            }))
1991            .expect("Failed to create manual subscription");
1992
1993        // Automatic subscription
1994        let auto_count_clone = auto_count.clone();
1995        let _auto_subscription = prop
1996            .subscribe_with_subscription(Arc::new(move |_, _| {
1997                auto_count_clone.fetch_add(1, Ordering::SeqCst);
1998            }))
1999            .expect("Failed to create automatic subscription");
2000
2001        // Both should be active
2002        prop.set(1).expect("Failed to set property value with both subscriptions");
2003        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2004        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2005
2006        // Manual unsubscribe
2007        prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2008
2009        // Only automatic subscription should be active
2010        prop.set(2).expect("Failed to set property value after manual unsubscribe");
2011        assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2012        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2013
2014        // Auto subscription goes out of scope here and cleans up automatically
2015    }
2016
2017    #[test]
2018    fn test_subscribe_with_subscription_error_handling() {
2019        let prop = Arc::new(ObservableProperty::new(0));
2020        let prop_clone = prop.clone();
2021
2022        // Poison the lock
2023        let poison_thread = thread::spawn(move || {
2024            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2025            panic!("Deliberate panic to poison the lock");
2026        });
2027        let _ = poison_thread.join();
2028
2029        // subscribe_with_subscription should return an error for poisoned lock
2030        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2031        assert!(result.is_err());
2032        match result.expect_err("Expected error for poisoned lock") {
2033            PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {
2034                // Either error type is acceptable for poisoned lock
2035            }
2036            other => panic!("Unexpected error type: {:?}", other),
2037        }
2038    }
2039
2040    #[test]
2041    fn test_subscription_with_property_cloning() {
2042        let prop1 = ObservableProperty::new(0);
2043        let prop2 = prop1.clone();
2044        let call_count = Arc::new(AtomicUsize::new(0));
2045
2046        // Subscribe to prop1
2047        let count = call_count.clone();
2048        let _subscription = prop1
2049            .subscribe_with_subscription(Arc::new(move |_, _| {
2050                count.fetch_add(1, Ordering::SeqCst);
2051            }))
2052            .expect("Failed to create subscription for cloned property test");
2053
2054        // Changes through prop2 should trigger the observer subscribed to prop1
2055        prop2.set(42).expect("Failed to set property value through prop2");
2056        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2057
2058        // Changes through prop1 should also trigger the observer
2059        prop1.set(100).expect("Failed to set property value through prop1");
2060        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2061    }
2062
2063    #[test]
2064    fn test_subscription_in_conditional_blocks() {
2065        let prop = ObservableProperty::new(0);
2066        let call_count = Arc::new(AtomicUsize::new(0));
2067
2068        let should_subscribe = true;
2069
2070        if should_subscribe {
2071            let count = call_count.clone();
2072            let _subscription = prop
2073                .subscribe_with_subscription(Arc::new(move |_, _| {
2074                    count.fetch_add(1, Ordering::SeqCst);
2075                }))
2076                .expect("Failed to create subscription in conditional block");
2077
2078            // Observer active within this block
2079            prop.set(1).expect("Failed to set property value in conditional block");
2080            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2081
2082            // Subscription dropped when exiting this block
2083        }
2084
2085        // Observer should be inactive now
2086        prop.set(2).expect("Failed to set property value after conditional block");
2087        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2088    }
2089
2090    #[test]
2091    fn test_subscription_with_early_return() {
2092        fn test_function(
2093            prop: &ObservableProperty<i32>,
2094            should_return_early: bool,
2095        ) -> Result<(), PropertyError> {
2096            let call_count = Arc::new(AtomicUsize::new(0));
2097            let count = call_count.clone();
2098
2099            let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2100                count.fetch_add(1, Ordering::SeqCst);
2101            }))?;
2102
2103            prop.set(1)?;
2104            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2105
2106            if should_return_early {
2107                return Ok(()); // Subscription should be cleaned up here
2108            }
2109
2110            prop.set(2)?;
2111            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2112
2113            Ok(())
2114            // Subscription cleaned up when function exits normally
2115        }
2116
2117        let prop = ObservableProperty::new(0);
2118
2119        // Test early return
2120        test_function(&prop, true).expect("Failed to test early return");
2121
2122        // Verify observer is no longer active after early return
2123        prop.set(10).expect("Failed to set property value after early return");
2124
2125        // Test normal exit
2126        test_function(&prop, false).expect("Failed to test normal exit");
2127
2128        // Verify observer is no longer active after normal exit
2129        prop.set(20).expect("Failed to set property value after normal exit");
2130    }
2131
2132    #[test]
2133    fn test_subscription_move_semantics() {
2134        let prop = ObservableProperty::new(0);
2135        let call_count = Arc::new(AtomicUsize::new(0));
2136
2137        let count = call_count.clone();
2138        let subscription = prop
2139            .subscribe_with_subscription(Arc::new(move |_, _| {
2140                count.fetch_add(1, Ordering::SeqCst);
2141            }))
2142            .expect("Failed to create subscription for move semantics test");
2143
2144        // Observer should be active
2145        prop.set(1).expect("Failed to set property value before move");
2146        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2147
2148        // Move subscription to a new variable
2149        let moved_subscription = subscription;
2150
2151        // Observer should still be active after move
2152        prop.set(2).expect("Failed to set property value after move");
2153        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2154
2155        // Drop the moved subscription
2156        drop(moved_subscription);
2157
2158        // Observer should now be inactive
2159        prop.set(3).expect("Failed to set property value after moved subscription drop");
2160        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2161    }
2162
2163    #[test]
2164    fn test_filtered_subscription_automatic_cleanup() {
2165        let prop = ObservableProperty::new(0);
2166        let call_count = Arc::new(AtomicUsize::new(0));
2167
2168        {
2169            let count = call_count.clone();
2170            let _subscription = prop
2171                .subscribe_filtered_with_subscription(
2172                    Arc::new(move |_, _| {
2173                        count.fetch_add(1, Ordering::SeqCst);
2174                    }),
2175                    |old, new| new > old, // Only trigger on increases
2176                )
2177                .expect("Failed to create filtered subscription");
2178
2179            // Should trigger (0 -> 5)
2180            prop.set(5).expect("Failed to set property value to 5 in filtered test");
2181            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2182
2183            // Should NOT trigger (5 -> 3)
2184            prop.set(3).expect("Failed to set property value to 3 in filtered test");
2185            assert_eq!(call_count.load(Ordering::SeqCst), 1);
2186
2187            // Should trigger (3 -> 10)
2188            prop.set(10).expect("Failed to set property value to 10 in filtered test");
2189            assert_eq!(call_count.load(Ordering::SeqCst), 2);
2190
2191            // Subscription goes out of scope here
2192        }
2193
2194        // Observer should be inactive after subscription cleanup
2195        prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2196        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2197    }
2198
2199    #[test]
2200    fn test_multiple_filtered_subscriptions() {
2201        let prop = ObservableProperty::new(10);
2202        let increase_count = Arc::new(AtomicUsize::new(0));
2203        let decrease_count = Arc::new(AtomicUsize::new(0));
2204        let significant_change_count = Arc::new(AtomicUsize::new(0));
2205
2206        let inc_count = increase_count.clone();
2207        let dec_count = decrease_count.clone();
2208        let sig_count = significant_change_count.clone();
2209
2210        let _increase_sub = prop
2211            .subscribe_filtered_with_subscription(
2212                Arc::new(move |_, _| {
2213                    inc_count.fetch_add(1, Ordering::SeqCst);
2214                }),
2215                |old, new| new > old,
2216            )
2217            .expect("Failed to create increase subscription");
2218
2219        let _decrease_sub = prop
2220            .subscribe_filtered_with_subscription(
2221                Arc::new(move |_, _| {
2222                    dec_count.fetch_add(1, Ordering::SeqCst);
2223                }),
2224                |old, new| new < old,
2225            )
2226            .expect("Failed to create decrease subscription");
2227
2228        let _significant_sub = prop
2229            .subscribe_filtered_with_subscription(
2230                Arc::new(move |_, _| {
2231                    sig_count.fetch_add(1, Ordering::SeqCst);
2232                }),
2233                |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2234            )
2235            .expect("Failed to create significant change subscription");
2236
2237        // Test increases
2238        prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2239        assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2240        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2241        assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2242
2243        // Test significant increase
2244        prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2245        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2246        assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2247        assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2248
2249        // Test significant decrease
2250        prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2251        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2252        assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2253        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2254
2255        // Test small decrease
2256        prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2257        assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2258        assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2259        assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2260
2261        // All subscriptions auto-cleanup when they go out of scope
2262    }
2263
2264    #[test]
2265    fn test_filtered_subscription_complex_filter() {
2266        let prop = ObservableProperty::new(0.0f64);
2267        let call_count = Arc::new(AtomicUsize::new(0));
2268        let values_received = Arc::new(RwLock::new(Vec::new()));
2269
2270        let count = call_count.clone();
2271        let values = values_received.clone();
2272        let _subscription = prop
2273            .subscribe_filtered_with_subscription(
2274                Arc::new(move |old, new| {
2275                    count.fetch_add(1, Ordering::SeqCst);
2276                    if let Ok(mut v) = values.write() {
2277                        v.push((*old, *new));
2278                    }
2279                }),
2280                |old, new| {
2281                    // Complex filter: trigger only when crossing integer boundaries
2282                    // and the change is significant (> 0.5)
2283                    let old_int = old.floor() as i32;
2284                    let new_int = new.floor() as i32;
2285                    old_int != new_int && (new - old).abs() > 0.5_f64
2286                },
2287            )
2288            .expect("Failed to create complex filtered subscription");
2289
2290        // Small changes within same integer - should not trigger
2291        prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2292        prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2293        assert_eq!(call_count.load(Ordering::SeqCst), 0);
2294
2295        // Cross integer boundary with significant change - should trigger
2296        prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2297        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2298
2299        // Small cross-boundary change - should not trigger
2300        prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2301        prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2302        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2303
2304        // Large cross-boundary change - should trigger
2305        prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2306        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2307
2308        // Verify received values
2309        let values = values_received.read().expect("Failed to read values in complex filter test");
2310        assert_eq!(values.len(), 2);
2311        assert_eq!(values[0], (0.7, 1.3));
2312        assert_eq!(values[1], (2.1, 3.5));
2313    }
2314
2315    #[test]
2316    fn test_filtered_subscription_error_handling() {
2317        let prop = Arc::new(ObservableProperty::new(0));
2318        let prop_clone = prop.clone();
2319
2320        // Poison the lock
2321        let poison_thread = thread::spawn(move || {
2322            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2323            panic!("Deliberate panic to poison the lock");
2324        });
2325        let _ = poison_thread.join();
2326
2327        // subscribe_filtered_with_subscription should return error for poisoned lock
2328        let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2329        assert!(result.is_err());
2330    }
2331
2332    #[test]
2333    fn test_filtered_subscription_vs_manual_filtered() {
2334        let prop = ObservableProperty::new(0);
2335        let auto_count = Arc::new(AtomicUsize::new(0));
2336        let manual_count = Arc::new(AtomicUsize::new(0));
2337
2338        // Manual filtered subscription
2339        let manual_count_clone = manual_count.clone();
2340        let manual_id = prop
2341            .subscribe_filtered(
2342                Arc::new(move |_, _| {
2343                    manual_count_clone.fetch_add(1, Ordering::SeqCst);
2344                }),
2345                |old, new| new > old,
2346            )
2347            .expect("Failed to create manual filtered subscription");
2348
2349        // Automatic filtered subscription
2350        let auto_count_clone = auto_count.clone();
2351        let _auto_subscription = prop
2352            .subscribe_filtered_with_subscription(
2353                Arc::new(move |_, _| {
2354                    auto_count_clone.fetch_add(1, Ordering::SeqCst);
2355                }),
2356                |old, new| new > old,
2357            )
2358            .expect("Failed to create automatic filtered subscription");
2359
2360        // Both should be triggered by increases
2361        prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2362        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2363        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2364
2365        // Neither should be triggered by decreases
2366        prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2367        assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2368        assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2369
2370        // Both should be triggered by increases again
2371        prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2372        assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2373        assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2374
2375        // Manual cleanup
2376        prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2377
2378        // Only automatic subscription should be active
2379        prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2380        assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2381        assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2382
2383        // Auto subscription cleaned up when it goes out of scope
2384    }
2385
2386    #[test]
2387    fn test_filtered_subscription_with_panicking_filter() {
2388        let prop = ObservableProperty::new(0);
2389        let call_count = Arc::new(AtomicUsize::new(0));
2390
2391        let count = call_count.clone();
2392        let _subscription = prop
2393            .subscribe_filtered_with_subscription(
2394                Arc::new(move |_, _| {
2395                    count.fetch_add(1, Ordering::SeqCst);
2396                }),
2397                |_, new| {
2398                    if *new == 42 {
2399                        panic!("Filter panic on 42");
2400                    }
2401                    true // Accept all other values
2402                },
2403            )
2404            .expect("Failed to create panicking filter subscription");
2405
2406        // Normal value should work
2407        prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2408        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2409
2410        // Value that causes filter to panic should be handled gracefully
2411        // The behavior here depends on how the filter panic is handled
2412        // In the current implementation, filter panics may cause the observer to not be called
2413        prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2414
2415        // Observer should still work for subsequent normal values
2416        prop.set(2).expect("Failed to set property to 2 after filter panic");
2417        // Note: The exact count here depends on panic handling implementation
2418        // The important thing is that the system doesn't crash
2419    }
2420
2421    #[test]
2422    fn test_subscription_thread_safety() {
2423        let prop = Arc::new(ObservableProperty::new(0));
2424        let num_threads = 8;
2425        let operations_per_thread = 50;
2426        let total_calls = Arc::new(AtomicUsize::new(0));
2427
2428        let handles: Vec<_> = (0..num_threads)
2429            .map(|thread_id| {
2430                let prop_clone = prop.clone();
2431                let calls_clone = total_calls.clone();
2432
2433                thread::spawn(move || {
2434                    let mut local_subscriptions = Vec::new();
2435
2436                    for i in 0..operations_per_thread {
2437                        let calls = calls_clone.clone();
2438                        let subscription = prop_clone
2439                            .subscribe_with_subscription(Arc::new(move |old, new| {
2440                                calls.fetch_add(1, Ordering::SeqCst);
2441                                // Simulate some work
2442                                let _ = thread_id + i + old + new;
2443                            }))
2444                            .expect("Should be able to create subscription");
2445
2446                        local_subscriptions.push(subscription);
2447
2448                        // Trigger observers
2449                        prop_clone
2450                            .set(thread_id * 1000 + i)
2451                            .expect("Should be able to set value");
2452
2453                        // Occasionally drop some subscriptions
2454                        if i % 5 == 0 && !local_subscriptions.is_empty() {
2455                            local_subscriptions.remove(0); // Drop first subscription
2456                        }
2457                    }
2458
2459                    // All remaining subscriptions dropped when vector goes out of scope
2460                })
2461            })
2462            .collect();
2463
2464        // Wait for all threads to complete
2465        for handle in handles {
2466            handle.join().expect("Thread should complete successfully");
2467        }
2468
2469        // Property should still be functional after all the concurrent operations
2470        prop.set(9999).expect("Property should still work");
2471
2472        // We can't easily verify the exact call count due to the complex timing,
2473        // but we can verify that the system didn't crash and is still operational
2474        println!(
2475            "Total observer calls: {}",
2476            total_calls.load(Ordering::SeqCst)
2477        );
2478    }
2479
2480    #[test]
2481    fn test_subscription_cross_thread_drop() {
2482        let prop = Arc::new(ObservableProperty::new(0));
2483        let call_count = Arc::new(AtomicUsize::new(0));
2484
2485        // Create subscription in main thread
2486        let count = call_count.clone();
2487        let subscription = prop
2488            .subscribe_with_subscription(Arc::new(move |_, _| {
2489                count.fetch_add(1, Ordering::SeqCst);
2490            }))
2491            .expect("Failed to create subscription for cross-thread drop test");
2492
2493        // Verify observer is active
2494        prop.set(1).expect("Failed to set property value in cross-thread drop test");
2495        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2496
2497        // Move subscription to another thread and drop it there
2498        let prop_clone = prop.clone();
2499        let call_count_clone = call_count.clone();
2500
2501        let handle = thread::spawn(move || {
2502            // Verify observer is still active in the other thread
2503            prop_clone.set(2).expect("Failed to set property value in other thread");
2504            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
2505
2506            // Drop subscription in this thread
2507            drop(subscription);
2508
2509            // Verify observer is no longer active
2510            prop_clone.set(3).expect("Failed to set property value after drop in other thread");
2511            assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
2512        });
2513
2514        handle.join().expect("Failed to join cross-thread drop test thread");
2515
2516        // Verify observer is still inactive in main thread
2517        prop.set(4).expect("Failed to set property value after thread join");
2518        assert_eq!(call_count.load(Ordering::SeqCst), 2);
2519    }
2520
2521    #[test]
2522    fn test_concurrent_subscription_creation_and_property_changes() {
2523        let prop = Arc::new(ObservableProperty::new(0));
2524        let total_notifications = Arc::new(AtomicUsize::new(0));
2525        let num_subscriber_threads = 4;
2526        let num_setter_threads = 2;
2527        let operations_per_thread = 25;
2528
2529        // Threads that create and destroy subscriptions
2530        let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
2531            .map(|_| {
2532                let prop_clone = prop.clone();
2533                let notifications_clone = total_notifications.clone();
2534
2535                thread::spawn(move || {
2536                    for _ in 0..operations_per_thread {
2537                        let notifications = notifications_clone.clone();
2538                        let _subscription = prop_clone
2539                            .subscribe_with_subscription(Arc::new(move |_, _| {
2540                                notifications.fetch_add(1, Ordering::SeqCst);
2541                            }))
2542                            .expect("Should create subscription");
2543
2544                        // Keep subscription alive for a short time
2545                        thread::sleep(Duration::from_millis(1));
2546
2547                        // Subscription dropped when _subscription goes out of scope
2548                    }
2549                })
2550            })
2551            .collect();
2552
2553        // Threads that continuously change the property value
2554        let setter_handles: Vec<_> = (0..num_setter_threads)
2555            .map(|thread_id| {
2556                let prop_clone = prop.clone();
2557
2558                thread::spawn(move || {
2559                    for i in 0..operations_per_thread * 2 {
2560                        prop_clone
2561                            .set(thread_id * 10000 + i)
2562                            .expect("Should set value");
2563                        thread::sleep(Duration::from_millis(1));
2564                    }
2565                })
2566            })
2567            .collect();
2568
2569        // Wait for all threads to complete
2570        for handle in subscriber_handles
2571            .into_iter()
2572            .chain(setter_handles.into_iter())
2573        {
2574            handle.join().expect("Thread should complete");
2575        }
2576
2577        // System should be stable after concurrent operations
2578        prop.set(99999).expect("Property should still work");
2579
2580        println!(
2581            "Total notifications during concurrent test: {}",
2582            total_notifications.load(Ordering::SeqCst)
2583        );
2584    }
2585
2586    #[test]
2587    fn test_filtered_subscription_thread_safety() {
2588        let prop = Arc::new(ObservableProperty::new(0));
2589        let increase_notifications = Arc::new(AtomicUsize::new(0));
2590        let decrease_notifications = Arc::new(AtomicUsize::new(0));
2591        let num_threads = 6;
2592
2593        let handles: Vec<_> = (0..num_threads)
2594            .map(|thread_id| {
2595                let prop_clone = prop.clone();
2596                let inc_notifications = increase_notifications.clone();
2597                let dec_notifications = decrease_notifications.clone();
2598
2599                thread::spawn(move || {
2600                    // Create increase-only subscription
2601                    let inc_count = inc_notifications.clone();
2602                    let _inc_subscription = prop_clone
2603                        .subscribe_filtered_with_subscription(
2604                            Arc::new(move |_, _| {
2605                                inc_count.fetch_add(1, Ordering::SeqCst);
2606                            }),
2607                            |old, new| new > old,
2608                        )
2609                        .expect("Should create filtered subscription");
2610
2611                    // Create decrease-only subscription
2612                    let dec_count = dec_notifications.clone();
2613                    let _dec_subscription = prop_clone
2614                        .subscribe_filtered_with_subscription(
2615                            Arc::new(move |_, _| {
2616                                dec_count.fetch_add(1, Ordering::SeqCst);
2617                            }),
2618                            |old, new| new < old,
2619                        )
2620                        .expect("Should create filtered subscription");
2621
2622                    // Perform some property changes
2623                    let base_value = thread_id * 100;
2624                    for i in 0..20 {
2625                        let new_value = base_value + (i % 10); // Creates increases and decreases
2626                        prop_clone.set(new_value).expect("Should set value");
2627                        thread::sleep(Duration::from_millis(1));
2628                    }
2629
2630                    // Subscriptions automatically cleaned up when going out of scope
2631                })
2632            })
2633            .collect();
2634
2635        // Wait for all threads
2636        for handle in handles {
2637            handle.join().expect("Thread should complete");
2638        }
2639
2640        // Verify system is still operational
2641        let initial_inc = increase_notifications.load(Ordering::SeqCst);
2642        let initial_dec = decrease_notifications.load(Ordering::SeqCst);
2643
2644        prop.set(1000).expect("Property should still work");
2645        prop.set(2000).expect("Property should still work");
2646
2647        // No new notifications should occur (all subscriptions cleaned up)
2648        assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
2649        assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
2650
2651        println!(
2652            "Increase notifications: {}, Decrease notifications: {}",
2653            initial_inc, initial_dec
2654        );
2655    }
2656
2657    #[test]
2658    fn test_subscription_with_async_property_changes() {
2659        let prop = Arc::new(ObservableProperty::new(0));
2660        let sync_notifications = Arc::new(AtomicUsize::new(0));
2661        let async_notifications = Arc::new(AtomicUsize::new(0));
2662
2663        // Subscription that tracks sync notifications
2664        let sync_count = sync_notifications.clone();
2665        let _sync_subscription = prop
2666            .subscribe_with_subscription(Arc::new(move |old, new| {
2667                sync_count.fetch_add(1, Ordering::SeqCst);
2668                // Simulate slow observer work
2669                thread::sleep(Duration::from_millis(5));
2670                println!("Sync observer: {} -> {}", old, new);
2671            }))
2672            .expect("Failed to create sync subscription");
2673
2674        // Subscription that tracks async notifications
2675        let async_count = async_notifications.clone();
2676        let _async_subscription = prop
2677            .subscribe_with_subscription(Arc::new(move |old, new| {
2678                async_count.fetch_add(1, Ordering::SeqCst);
2679                println!("Async observer: {} -> {}", old, new);
2680            }))
2681            .expect("Failed to create async subscription");
2682
2683        // Test sync property changes
2684        let start = std::time::Instant::now();
2685        prop.set(1).expect("Failed to set property value 1 in async test");
2686        prop.set(2).expect("Failed to set property value 2 in async test");
2687        let sync_duration = start.elapsed();
2688
2689        // Test async property changes
2690        let start = std::time::Instant::now();
2691        prop.set_async(3).expect("Failed to set property value 3 async");
2692        prop.set_async(4).expect("Failed to set property value 4 async");
2693        let async_duration = start.elapsed();
2694
2695        // Async should be much faster
2696        assert!(async_duration < sync_duration);
2697
2698        // Wait for async observers to complete
2699        thread::sleep(Duration::from_millis(50));
2700
2701        // All observers should have been called
2702        assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
2703        assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
2704
2705        // Subscriptions auto-cleanup when going out of scope
2706    }
2707
2708    #[test]
2709    fn test_subscription_creation_with_poisoned_lock() {
2710        let prop = Arc::new(ObservableProperty::new(0));
2711        let prop_clone = prop.clone();
2712
2713        // Create a valid subscription before poisoning
2714        let call_count = Arc::new(AtomicUsize::new(0));
2715        let count = call_count.clone();
2716        let existing_subscription = prop
2717            .subscribe_with_subscription(Arc::new(move |_, _| {
2718                count.fetch_add(1, Ordering::SeqCst);
2719            }))
2720            .expect("Failed to create subscription before poisoning");
2721
2722        // Poison the lock
2723        let poison_thread = thread::spawn(move || {
2724            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
2725            panic!("Deliberate panic to poison the lock");
2726        });
2727        let _ = poison_thread.join();
2728
2729        // New subscription creation should fail
2730        let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2731        assert!(result.is_err());
2732
2733        // New filtered subscription creation should also fail
2734        let filtered_result =
2735            prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2736        assert!(filtered_result.is_err());
2737
2738        // Dropping existing subscription should not panic
2739        drop(existing_subscription);
2740    }
2741
2742    #[test]
2743    fn test_subscription_cleanup_behavior_with_poisoned_lock() {
2744        // This test specifically verifies that Drop doesn't panic with poisoned locks
2745        let prop = Arc::new(ObservableProperty::new(0));
2746        let call_count = Arc::new(AtomicUsize::new(0));
2747
2748        // Create subscription
2749        let count = call_count.clone();
2750        let subscription = prop
2751            .subscribe_with_subscription(Arc::new(move |_, _| {
2752                count.fetch_add(1, Ordering::SeqCst);
2753            }))
2754            .expect("Failed to create subscription for cleanup behavior test");
2755
2756        // Verify it works initially
2757        prop.set(1).expect("Failed to set property value in cleanup behavior test");
2758        assert_eq!(call_count.load(Ordering::SeqCst), 1);
2759
2760        // Poison the lock from another thread
2761        let prop_clone = prop.clone();
2762        let poison_thread = thread::spawn(move || {
2763            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
2764            panic!("Deliberate panic to poison the lock");
2765        });
2766        let _ = poison_thread.join();
2767
2768        // Now drop the subscription - this should NOT panic
2769        // The Drop implementation should handle the poisoned lock gracefully
2770        drop(subscription);
2771
2772        // Test succeeds if we reach this point without panicking
2773    }
2774
2775    #[test]
2776    fn test_multiple_subscription_cleanup_with_poisoned_lock() {
2777        let prop = Arc::new(ObservableProperty::new(0));
2778        let mut subscriptions = Vec::new();
2779
2780        // Create multiple subscriptions
2781        for i in 0..5 {
2782            let call_count = Arc::new(AtomicUsize::new(0));
2783            let count = call_count.clone();
2784            let subscription = prop
2785                .subscribe_with_subscription(Arc::new(move |old, new| {
2786                    count.fetch_add(1, Ordering::SeqCst);
2787                    println!("Observer {}: {} -> {}", i, old, new);
2788                }))
2789                .expect("Failed to create subscription in multiple cleanup test");
2790            subscriptions.push(subscription);
2791        }
2792
2793        // Verify they all work
2794        prop.set(42).expect("Failed to set property value in multiple cleanup test");
2795
2796        // Poison the lock
2797        let prop_clone = prop.clone();
2798        let poison_thread = thread::spawn(move || {
2799            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
2800            panic!("Deliberate panic to poison the lock");
2801        });
2802        let _ = poison_thread.join();
2803
2804        // Drop all subscriptions - none should panic
2805        for subscription in subscriptions {
2806            drop(subscription);
2807        }
2808
2809        // Test succeeds if no panics occurred
2810    }
2811
2812    #[test]
2813    fn test_subscription_behavior_before_and_after_poison() {
2814        let prop = Arc::new(ObservableProperty::new(0));
2815        let before_poison_count = Arc::new(AtomicUsize::new(0));
2816        let after_poison_count = Arc::new(AtomicUsize::new(0));
2817
2818        // Create subscription before poisoning
2819        let before_count = before_poison_count.clone();
2820        let before_subscription = prop
2821            .subscribe_with_subscription(Arc::new(move |_, _| {
2822                before_count.fetch_add(1, Ordering::SeqCst);
2823            }))
2824            .expect("Failed to create subscription before poison test");
2825
2826        // Verify it works
2827        prop.set(1).expect("Failed to set property value before poison test");
2828        assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
2829
2830        // Poison the lock
2831        let prop_clone = prop.clone();
2832        let poison_thread = thread::spawn(move || {
2833            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
2834            panic!("Deliberate panic to poison the lock");
2835        });
2836        let _ = poison_thread.join();
2837
2838        // Try to create subscription after poisoning - should fail
2839        let after_count = after_poison_count.clone();
2840        let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2841            after_count.fetch_add(1, Ordering::SeqCst);
2842        }));
2843        assert!(after_result.is_err());
2844
2845        // Clean up the before-poison subscription - should not panic
2846        drop(before_subscription);
2847
2848        // Verify after-poison subscription was never created
2849        assert_eq!(after_poison_count.load(Ordering::SeqCst), 0);
2850    }
2851
2852    #[test]
2853    fn test_concurrent_subscription_drops_with_poison() {
2854        let prop = Arc::new(ObservableProperty::new(0));
2855        let num_subscriptions = 10;
2856        let mut subscriptions = Vec::new();
2857
2858        // Create multiple subscriptions
2859        for i in 0..num_subscriptions {
2860            let call_count = Arc::new(AtomicUsize::new(0));
2861            let count = call_count.clone();
2862            let subscription = prop
2863                .subscribe_with_subscription(Arc::new(move |_, _| {
2864                    count.fetch_add(1, Ordering::SeqCst);
2865                    println!("Observer {}", i);
2866                }))
2867                .expect("Failed to create subscription in concurrent drops test");
2868            subscriptions.push(subscription);
2869        }
2870
2871        // Poison the lock
2872        let prop_clone = prop.clone();
2873        let poison_thread = thread::spawn(move || {
2874            let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
2875            panic!("Deliberate panic to poison the lock");
2876        });
2877        let _ = poison_thread.join();
2878
2879        // Drop subscriptions concurrently from multiple threads
2880        let handles: Vec<_> = subscriptions
2881            .into_iter()
2882            .enumerate()
2883            .map(|(i, subscription)| {
2884                thread::spawn(move || {
2885                    // Add some randomness to timing
2886                    thread::sleep(Duration::from_millis(i as u64 % 5));
2887                    drop(subscription);
2888                    println!("Dropped subscription {}", i);
2889                })
2890            })
2891            .collect();
2892
2893        // Wait for all drops to complete
2894        for handle in handles {
2895            handle
2896                .join()
2897                .expect("Drop thread should complete without panic");
2898        }
2899
2900        // Test succeeds if all threads completed successfully
2901    }
2902}