observable_property/lib.rs
1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
15//!
16//! ## Quick Start
17//!
18//! ```rust
19//! use observable_property::ObservableProperty;
20//! use std::sync::Arc;
21//!
22//! // Create an observable property
23//! let property = ObservableProperty::new(42);
24//!
25//! // Subscribe to changes
26//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
27//! println!("Value changed from {} to {}", old_value, new_value);
28//! })).map_err(|e| {
29//! eprintln!("Failed to subscribe: {}", e);
30//! e
31//! })?;
32//!
33//! // Change the value (triggers observer)
34//! property.set(100).map_err(|e| {
35//! eprintln!("Failed to set value: {}", e);
36//! e
37//! })?;
38//!
39//! // Unsubscribe when done
40//! property.unsubscribe(observer_id).map_err(|e| {
41//! eprintln!("Failed to unsubscribe: {}", e);
42//! e
43//! })?;
44//! # Ok::<(), observable_property::PropertyError>(())
45//! ```
46//!
47//! ## Multi-threading Example
48//!
49//! ```rust
50//! use observable_property::ObservableProperty;
51//! use std::sync::Arc;
52//! use std::thread;
53//!
54//! let property = Arc::new(ObservableProperty::new(0));
55//! let property_clone = property.clone();
56//!
57//! // Subscribe from one thread
58//! property.subscribe(Arc::new(|old, new| {
59//! println!("Value changed: {} -> {}", old, new);
60//! })).map_err(|e| {
61//! eprintln!("Failed to subscribe: {}", e);
62//! e
63//! })?;
64//!
65//! // Modify from another thread
66//! thread::spawn(move || {
67//! if let Err(e) = property_clone.set(42) {
68//! eprintln!("Failed to set value: {}", e);
69//! }
70//! }).join().expect("Thread panicked");
71//! # Ok::<(), observable_property::PropertyError>(())
72//! ```
73//!
74//! ## RAII Subscriptions (Recommended)
75//!
76//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
77//!
78//! ```rust
79//! use observable_property::ObservableProperty;
80//! use std::sync::Arc;
81//!
82//! # fn main() -> Result<(), observable_property::PropertyError> {
83//! let property = ObservableProperty::new(0);
84//!
85//! {
86//! // Create RAII subscription - automatically cleaned up when dropped
87//! let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
88//! println!("Value changed: {} -> {}", old, new);
89//! }))?;
90//!
91//! property.set(42)?; // Prints: "Value changed: 0 -> 42"
92//!
93//! // Subscription automatically unsubscribes when leaving this scope
94//! }
95//!
96//! // No observer active anymore
97//! property.set(100)?; // No output
98//! # Ok(())
99//! # }
100//! ```
101//!
102//! ## Filtered RAII Subscriptions
103//!
104//! Combine filtering with automatic cleanup for conditional monitoring:
105//!
106//! ```rust
107//! use observable_property::ObservableProperty;
108//! use std::sync::Arc;
109//!
110//! # fn main() -> Result<(), observable_property::PropertyError> {
111//! let temperature = ObservableProperty::new(20.0);
112//!
113//! {
114//! // Monitor only significant temperature increases with automatic cleanup
115//! let _heat_warning = temperature.subscribe_filtered_with_subscription(
116//! Arc::new(|old, new| {
117//! println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
118//! }),
119//! |old, new| new > old && (new - old) > 5.0
120//! )?;
121//!
122//! temperature.set(22.0)?; // No warning (only 2°C increase)
123//! temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
124//!
125//! // Subscription automatically cleaned up here
126//! }
127//!
128//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
129//! # Ok(())
130//! # }
131//! ```
132//!
133//! ## Subscription Management Comparison
134//!
135//! ```rust
136//! use observable_property::ObservableProperty;
137//! use std::sync::Arc;
138//!
139//! # fn main() -> Result<(), observable_property::PropertyError> {
140//! let property = ObservableProperty::new(0);
141//! let observer = Arc::new(|old: &i32, new: &i32| {
142//! println!("Value: {} -> {}", old, new);
143//! });
144//!
145//! // Method 1: Manual subscription management
146//! let observer_id = property.subscribe(observer.clone())?;
147//! property.set(42)?;
148//! property.unsubscribe(observer_id)?; // Manual cleanup required
149//!
150//! // Method 2: RAII subscription management (recommended)
151//! {
152//! let _subscription = property.subscribe_with_subscription(observer)?;
153//! property.set(100)?;
154//! // Automatic cleanup when _subscription goes out of scope
155//! }
156//! # Ok(())
157//! # }
158//! ```
159//!
160//! ## Advanced RAII Patterns
161//!
162//! Comprehensive example showing various RAII subscription patterns:
163//!
164//! ```rust
165//! use observable_property::ObservableProperty;
166//! use std::sync::Arc;
167//!
168//! # fn main() -> Result<(), observable_property::PropertyError> {
169//! // System monitoring example
170//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
171//! let memory_usage = ObservableProperty::new(1024); // MB
172//! let active_connections = ObservableProperty::new(0u32);
173//!
174//! // Conditional monitoring based on system state
175//! let high_load_monitoring = cpu_usage.get()? > 50.0;
176//!
177//! if high_load_monitoring {
178//! // Critical system monitoring - active only during high load
179//! let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
180//! Arc::new(|old, new| {
181//! println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
182//! }),
183//! |_, new| *new > 80.0
184//! )?;
185//!
186//! let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
187//! Arc::new(|old, new| {
188//! println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
189//! }),
190//! |_, new| *new > 8192 // > 8GB
191//! )?;
192//!
193//! // Simulate system load changes
194//! cpu_usage.set(85.0)?; // Would trigger critical alert
195//! memory_usage.set(9216)?; // Would trigger memory warning
196//!
197//! // All monitoring automatically stops when exiting this block
198//! }
199//!
200//! // Connection monitoring with scoped lifetime
201//! {
202//! let _connection_monitor = active_connections.subscribe_with_subscription(
203//! Arc::new(|old, new| {
204//! if new > old {
205//! println!("📈 New connections: {} -> {}", old, new);
206//! } else if new < old {
207//! println!("📉 Connections closed: {} -> {}", old, new);
208//! }
209//! })
210//! )?;
211//!
212//! active_connections.set(5)?; // Prints: "📈 New connections: 0 -> 5"
213//! active_connections.set(3)?; // Prints: "📉 Connections closed: 5 -> 3"
214//! active_connections.set(8)?; // Prints: "📈 New connections: 3 -> 8"
215//!
216//! // Connection monitoring automatically stops here
217//! }
218//!
219//! // No monitoring active anymore
220//! cpu_usage.set(95.0)?; // No output
221//! memory_usage.set(10240)?; // No output
222//! active_connections.set(15)?; // No output
223//!
224//! println!("All monitoring automatically cleaned up!");
225//! # Ok(())
226//! # }
227//! ```
228
229use std::collections::HashMap;
230use std::fmt;
231use std::panic;
232use std::sync::{Arc, RwLock};
233use std::thread;
234
235/// Maximum number of background threads used for asynchronous observer notifications
236///
237/// This constant controls the degree of parallelism when using `set_async()` to notify
238/// observers. The observer list is divided into batches, with each batch running in
239/// its own background thread, up to this maximum number of threads.
240///
241/// # Rationale
242///
243/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
244/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead
245/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
246/// - **System Responsiveness**: Limits thread contention on multi-core systems
247///
248/// # Implementation Details
249///
250/// When `set_async()` is called:
251/// 1. All observers are collected into a snapshot
252/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
253/// 3. Each batch executes in its own `thread::spawn()` call
254/// 4. Observers within each batch are executed sequentially
255///
256/// For example, with 100 observers and `MAX_THREADS = 4`:
257/// - Batch 1: Observers 1-25 (Thread 1)
258/// - Batch 2: Observers 26-50 (Thread 2)
259/// - Batch 3: Observers 51-75 (Thread 3)
260/// - Batch 4: Observers 76-100 (Thread 4)
261///
262/// # Tuning Considerations
263///
264/// This value can be adjusted based on your application's needs:
265/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
266/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
267/// - **Memory-constrained systems**: Lower values reduce thread overhead
268/// - **Real-time systems**: Lower values reduce scheduling unpredictability
269///
270/// # Thread Safety
271///
272/// This constant is used only during the batching calculation and does not affect
273/// the thread safety of the overall system.
274const MAX_THREADS: usize = 4;
275/// Errors that can occur when working with ObservableProperty
276#[derive(Debug, Clone)]
277pub enum PropertyError {
278 /// Failed to acquire a read lock on the property
279 ReadLockError {
280 /// Context describing what operation was being attempted
281 context: String,
282 },
283 /// Failed to acquire a write lock on the property
284 WriteLockError {
285 /// Context describing what operation was being attempted
286 context: String,
287 },
288 /// Attempted to unsubscribe an observer that doesn't exist
289 ObserverNotFound {
290 /// The ID of the observer that wasn't found
291 id: usize,
292 },
293 /// The property's lock has been poisoned due to a panic in another thread
294 PoisonedLock,
295 /// An observer function encountered an error during execution
296 ObserverError {
297 /// Description of what went wrong
298 reason: String,
299 },
300 /// The thread pool for async notifications is exhausted
301 ThreadPoolExhausted,
302 /// Invalid configuration was provided
303 InvalidConfiguration {
304 /// Description of the invalid configuration
305 reason: String,
306 },
307}
308
309impl fmt::Display for PropertyError {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 match self {
312 PropertyError::ReadLockError { context } => {
313 write!(f, "Failed to acquire read lock: {}", context)
314 }
315 PropertyError::WriteLockError { context } => {
316 write!(f, "Failed to acquire write lock: {}", context)
317 }
318 PropertyError::ObserverNotFound { id } => {
319 write!(f, "Observer with ID {} not found", id)
320 }
321 PropertyError::PoisonedLock => {
322 write!(
323 f,
324 "Property is in a poisoned state due to a panic in another thread"
325 )
326 }
327 PropertyError::ObserverError { reason } => {
328 write!(f, "Observer execution failed: {}", reason)
329 }
330 PropertyError::ThreadPoolExhausted => {
331 write!(f, "Thread pool is exhausted and cannot spawn more observers")
332 }
333 PropertyError::InvalidConfiguration { reason } => {
334 write!(f, "Invalid configuration: {}", reason)
335 }
336 }
337 }
338}
339
340impl std::error::Error for PropertyError {}
341
342/// Function type for observers that get called when property values change
343pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
344
345/// Unique identifier for registered observers
346pub type ObserverId = usize;
347
348/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
349///
350/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
351/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
352/// `Drop` implementation automatically removes the associated observer from the property.
353///
354/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
355/// leaks in scenarios where observers might otherwise be forgotten.
356///
357/// # Type Requirements
358///
359/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
360/// - `Clone`: Required for observer notifications
361/// - `Send`: Required for transferring between threads
362/// - `Sync`: Required for concurrent access from multiple threads
363/// - `'static`: Required for observer callbacks that may outlive the original scope
364///
365/// # Examples
366///
367/// ## Basic RAII Subscription
368///
369/// ```rust
370/// use observable_property::ObservableProperty;
371/// use std::sync::Arc;
372///
373/// # fn main() -> Result<(), observable_property::PropertyError> {
374/// let property = ObservableProperty::new(0);
375///
376/// {
377/// // Create subscription - observer is automatically registered
378/// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
379/// println!("Value changed: {} -> {}", old, new);
380/// }))?;
381///
382/// property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
383///
384/// // When _subscription goes out of scope here, observer is automatically removed
385/// }
386///
387/// property.set(100)?; // No observer output - subscription was automatically cleaned up
388/// # Ok(())
389/// # }
390/// ```
391///
392/// ## Cross-Thread Subscription Management
393///
394/// ```rust
395/// use observable_property::ObservableProperty;
396/// use std::sync::Arc;
397/// use std::thread;
398///
399/// # fn main() -> Result<(), observable_property::PropertyError> {
400/// let property = Arc::new(ObservableProperty::new(0));
401/// let property_clone = property.clone();
402///
403/// // Create subscription in main thread
404/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
405/// println!("Observed: {} -> {}", old, new);
406/// }))?;
407///
408/// // Move subscription to another thread for cleanup
409/// let handle = thread::spawn(move || {
410/// // Subscription is still active here
411/// let _ = property_clone.set(42); // Will trigger observer
412///
413/// // When subscription is dropped here (end of thread), observer is cleaned up
414/// drop(subscription);
415/// });
416///
417/// handle.join().unwrap();
418///
419/// // Observer is no longer active
420/// property.set(100)?; // No output
421/// # Ok(())
422/// # }
423/// ```
424///
425/// ## Conditional Scoped Subscriptions
426///
427/// ```rust
428/// use observable_property::ObservableProperty;
429/// use std::sync::Arc;
430///
431/// # fn main() -> Result<(), observable_property::PropertyError> {
432/// let counter = ObservableProperty::new(0);
433/// let debug_mode = true;
434///
435/// if debug_mode {
436/// let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
437/// println!("Debug: counter {} -> {}", old, new);
438/// }))?;
439///
440/// counter.set(1)?; // Prints debug info
441/// counter.set(2)?; // Prints debug info
442///
443/// // Debug subscription automatically cleaned up when exiting if block
444/// }
445///
446/// counter.set(3)?; // No debug output (subscription was cleaned up)
447/// # Ok(())
448/// # }
449/// ```
450///
451/// # Thread Safety
452///
453/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
454/// sent between threads and the automatic cleanup will work correctly even if the
455/// subscription is dropped from a different thread than where it was created.
456pub struct Subscription<T: Clone + Send + Sync + 'static> {
457 inner: Arc<RwLock<InnerProperty<T>>>,
458 id: ObserverId,
459}
460
461impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
462 /// Debug implementation that shows the subscription ID without exposing internals
463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464 f.debug_struct("Subscription")
465 .field("id", &self.id)
466 .field("inner", &"[ObservableProperty]")
467 .finish()
468 }
469}
470
471impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
472 /// Automatically removes the associated observer when the subscription is dropped
473 ///
474 /// This implementation provides automatic cleanup by removing the observer
475 /// from the property's observer list when the `Subscription` goes out of scope.
476 ///
477 /// # Error Handling
478 ///
479 /// If the property's lock is poisoned or inaccessible during cleanup, the error
480 /// is silently ignored using the `let _ = ...` pattern. This is intentional
481 /// because:
482 /// 1. Drop implementations should not panic
483 /// 2. If the property is poisoned, it's likely unusable anyway
484 /// 3. There's no meaningful way to handle cleanup errors in a destructor
485 ///
486 /// # Thread Safety
487 ///
488 /// This method is safe to call from any thread, even if the subscription
489 /// was created on a different thread.
490 fn drop(&mut self) {
491 let _ = self.inner.write().map(|mut prop| {
492 prop.observers.remove(&self.id);
493 });
494 }
495}
496
497/// A thread-safe observable property that notifies observers when its value changes
498///
499/// This type wraps a value of type `T` and allows multiple observers to be notified
500/// whenever the value is modified. All operations are thread-safe and can be called
501/// from multiple threads concurrently.
502///
503/// # Type Requirements
504///
505/// The generic type `T` must implement:
506/// - `Clone`: Required for returning values and passing them to observers
507/// - `Send`: Required for transferring between threads
508/// - `Sync`: Required for concurrent access from multiple threads
509/// - `'static`: Required for observer callbacks that may outlive the original scope
510///
511/// # Examples
512///
513/// ```rust
514/// use observable_property::ObservableProperty;
515/// use std::sync::Arc;
516///
517/// let property = ObservableProperty::new("initial".to_string());
518///
519/// let observer_id = property.subscribe(Arc::new(|old, new| {
520/// println!("Changed from '{}' to '{}'", old, new);
521/// })).map_err(|e| {
522/// eprintln!("Failed to subscribe: {}", e);
523/// e
524/// })?;
525///
526/// property.set("updated".to_string()).map_err(|e| {
527/// eprintln!("Failed to set value: {}", e);
528/// e
529/// })?; // Prints: Changed from 'initial' to 'updated'
530///
531/// property.unsubscribe(observer_id).map_err(|e| {
532/// eprintln!("Failed to unsubscribe: {}", e);
533/// e
534/// })?;
535/// # Ok::<(), observable_property::PropertyError>(())
536/// ```
537pub struct ObservableProperty<T> {
538 inner: Arc<RwLock<InnerProperty<T>>>,
539 max_threads: usize,
540}
541
542struct InnerProperty<T> {
543 value: T,
544 observers: HashMap<ObserverId, Observer<T>>,
545 next_id: ObserverId,
546}
547
548impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
549 /// Creates a new observable property with the given initial value
550 ///
551 /// # Arguments
552 ///
553 /// * `initial_value` - The starting value for this property
554 ///
555 /// # Examples
556 ///
557 /// ```rust
558 /// use observable_property::ObservableProperty;
559 ///
560 /// let property = ObservableProperty::new(42);
561 /// match property.get() {
562 /// Ok(value) => assert_eq!(value, 42),
563 /// Err(e) => eprintln!("Failed to get property value: {}", e),
564 /// }
565 /// ```
566 pub fn new(initial_value: T) -> Self {
567 Self {
568 inner: Arc::new(RwLock::new(InnerProperty {
569 value: initial_value,
570 observers: HashMap::new(),
571 next_id: 0,
572 })),
573 max_threads: MAX_THREADS,
574 }
575 }
576
577 pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
578 let max_threads = if max_threads == 0 {
579 MAX_THREADS
580 } else {
581 max_threads
582 };
583 Self {
584 inner: Arc::new(RwLock::new(InnerProperty {
585 value: initial_value,
586 observers: HashMap::new(),
587 next_id: 0,
588 })),
589 max_threads,
590 }
591 }
592
593 /// Gets the current value of the property
594 ///
595 /// This method acquires a read lock, which allows multiple concurrent readers
596 /// but will block if a writer currently holds the lock.
597 ///
598 /// # Returns
599 ///
600 /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
601 /// if the lock is poisoned.
602 ///
603 /// # Examples
604 ///
605 /// ```rust
606 /// use observable_property::ObservableProperty;
607 ///
608 /// let property = ObservableProperty::new("hello".to_string());
609 /// match property.get() {
610 /// Ok(value) => assert_eq!(value, "hello"),
611 /// Err(e) => eprintln!("Failed to get property value: {}", e),
612 /// }
613 /// ```
614 pub fn get(&self) -> Result<T, PropertyError> {
615 self.inner
616 .read()
617 .map(|prop| prop.value.clone())
618 .map_err(|_| PropertyError::PoisonedLock)
619 }
620
621 /// Sets the property to a new value and notifies all observers
622 ///
623 /// This method will:
624 /// 1. Acquire a write lock (blocking other readers/writers)
625 /// 2. Update the value and capture a snapshot of observers
626 /// 3. Release the lock
627 /// 4. Notify all observers sequentially with the old and new values
628 ///
629 /// Observer notifications are wrapped in panic recovery to prevent one
630 /// misbehaving observer from affecting others.
631 ///
632 /// # Arguments
633 ///
634 /// * `new_value` - The new value to set
635 ///
636 /// # Returns
637 ///
638 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
639 ///
640 /// # Examples
641 ///
642 /// ```rust
643 /// use observable_property::ObservableProperty;
644 /// use std::sync::Arc;
645 ///
646 /// let property = ObservableProperty::new(10);
647 ///
648 /// property.subscribe(Arc::new(|old, new| {
649 /// println!("Value changed from {} to {}", old, new);
650 /// })).map_err(|e| {
651 /// eprintln!("Failed to subscribe: {}", e);
652 /// e
653 /// })?;
654 ///
655 /// property.set(20).map_err(|e| {
656 /// eprintln!("Failed to set property value: {}", e);
657 /// e
658 /// })?; // Triggers observer notification
659 /// # Ok::<(), observable_property::PropertyError>(())
660 /// ```
661 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
662 let (old_value, observers_snapshot) = {
663 let mut prop = self
664 .inner
665 .write()
666 .map_err(|_| PropertyError::WriteLockError {
667 context: "setting property value".to_string(),
668 })?;
669
670 let old_value = prop.value.clone();
671 prop.value = new_value.clone();
672 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
673 (old_value, observers_snapshot)
674 };
675
676 for observer in observers_snapshot {
677 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
678 observer(&old_value, &new_value);
679 })) {
680 eprintln!("Observer panic: {:?}", e);
681 }
682 }
683
684 Ok(())
685 }
686
687 /// Sets the property to a new value and notifies observers asynchronously
688 ///
689 /// This method is similar to `set()` but spawns observers in background threads
690 /// for non-blocking operation. This is useful when observers might perform
691 /// time-consuming operations.
692 ///
693 /// Observers are batched into groups and each batch runs in its own thread
694 /// to limit resource usage while still providing parallelism.
695 ///
696 /// # Arguments
697 ///
698 /// * `new_value` - The new value to set
699 ///
700 /// # Returns
701 ///
702 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
703 /// Note that this only indicates the property was updated successfully;
704 /// observer execution happens asynchronously.
705 ///
706 /// # Examples
707 ///
708 /// ```rust
709 /// use observable_property::ObservableProperty;
710 /// use std::sync::Arc;
711 /// use std::time::Duration;
712 ///
713 /// let property = ObservableProperty::new(0);
714 ///
715 /// property.subscribe(Arc::new(|old, new| {
716 /// // This observer does slow work but won't block the caller
717 /// std::thread::sleep(Duration::from_millis(100));
718 /// println!("Slow observer: {} -> {}", old, new);
719 /// })).map_err(|e| {
720 /// eprintln!("Failed to subscribe: {}", e);
721 /// e
722 /// })?;
723 ///
724 /// // This returns immediately even though observer is slow
725 /// property.set_async(42).map_err(|e| {
726 /// eprintln!("Failed to set value asynchronously: {}", e);
727 /// e
728 /// })?;
729 /// # Ok::<(), observable_property::PropertyError>(())
730 /// ```
731 pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
732 let (old_value, observers_snapshot) = {
733 let mut prop = self
734 .inner
735 .write()
736 .map_err(|_| PropertyError::WriteLockError {
737 context: "setting property value".to_string(),
738 })?;
739
740 let old_value = prop.value.clone();
741 prop.value = new_value.clone();
742 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
743 (old_value, observers_snapshot)
744 };
745
746 if observers_snapshot.is_empty() {
747 return Ok(());
748 }
749
750 let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
751
752 for batch in observers_snapshot.chunks(observers_per_thread) {
753 let batch_observers = batch.to_vec();
754 let old_val = old_value.clone();
755 let new_val = new_value.clone();
756
757 thread::spawn(move || {
758 for observer in batch_observers {
759 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
760 observer(&old_val, &new_val);
761 })) {
762 eprintln!("Observer panic in batch thread: {:?}", e);
763 }
764 }
765 });
766 }
767
768 Ok(())
769 }
770
771 /// Subscribes an observer function to be called when the property changes
772 ///
773 /// The observer function will be called with the old and new values whenever
774 /// the property is modified via `set()` or `set_async()`.
775 ///
776 /// # Arguments
777 ///
778 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
779 ///
780 /// # Returns
781 ///
782 /// `Ok(ObserverId)` containing a unique identifier for this observer,
783 /// or `Err(PropertyError)` if the lock is poisoned.
784 ///
785 /// # Examples
786 ///
787 /// ```rust
788 /// use observable_property::ObservableProperty;
789 /// use std::sync::Arc;
790 ///
791 /// let property = ObservableProperty::new(0);
792 ///
793 /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
794 /// println!("Property changed from {} to {}", old_value, new_value);
795 /// })).map_err(|e| {
796 /// eprintln!("Failed to subscribe observer: {}", e);
797 /// e
798 /// })?;
799 ///
800 /// // Later, unsubscribe using the returned ID
801 /// property.unsubscribe(observer_id).map_err(|e| {
802 /// eprintln!("Failed to unsubscribe observer: {}", e);
803 /// e
804 /// })?;
805 /// # Ok::<(), observable_property::PropertyError>(())
806 /// ```
807 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
808 let mut prop = self
809 .inner
810 .write()
811 .map_err(|_| PropertyError::WriteLockError {
812 context: "subscribing observer".to_string(),
813 })?;
814
815 let id = prop.next_id;
816 prop.next_id += 1;
817 prop.observers.insert(id, observer);
818 Ok(id)
819 }
820
821 /// Removes an observer identified by its ID
822 ///
823 /// # Arguments
824 ///
825 /// * `id` - The observer ID returned by `subscribe()`
826 ///
827 /// # Returns
828 ///
829 /// `Ok(bool)` where `true` means the observer was found and removed,
830 /// `false` means no observer with that ID existed.
831 /// Returns `Err(PropertyError)` if the lock is poisoned.
832 ///
833 /// # Examples
834 ///
835 /// ```rust
836 /// use observable_property::ObservableProperty;
837 /// use std::sync::Arc;
838 ///
839 /// let property = ObservableProperty::new(0);
840 /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
841 /// eprintln!("Failed to subscribe: {}", e);
842 /// e
843 /// })?;
844 ///
845 /// let was_removed = property.unsubscribe(id).map_err(|e| {
846 /// eprintln!("Failed to unsubscribe: {}", e);
847 /// e
848 /// })?;
849 /// assert!(was_removed); // Observer existed and was removed
850 ///
851 /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
852 /// eprintln!("Failed to unsubscribe again: {}", e);
853 /// e
854 /// })?;
855 /// assert!(!was_removed_again); // Observer no longer exists
856 /// # Ok::<(), observable_property::PropertyError>(())
857 /// ```
858 pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
859 let mut prop = self
860 .inner
861 .write()
862 .map_err(|_| PropertyError::WriteLockError {
863 context: "unsubscribing observer".to_string(),
864 })?;
865
866 let was_present = prop.observers.remove(&id).is_some();
867 Ok(was_present)
868 }
869
870 /// Subscribes an observer that only gets called when a filter condition is met
871 ///
872 /// This is useful for observing only specific types of changes, such as
873 /// when a value increases or crosses a threshold.
874 ///
875 /// # Arguments
876 ///
877 /// * `observer` - The observer function to call when the filter passes
878 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
879 ///
880 /// # Returns
881 ///
882 /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
883 ///
884 /// # Examples
885 ///
886 /// ```rust
887 /// use observable_property::ObservableProperty;
888 /// use std::sync::Arc;
889 ///
890 /// let property = ObservableProperty::new(0);
891 ///
892 /// // Only notify when value increases
893 /// let id = property.subscribe_filtered(
894 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
895 /// |old, new| new > old
896 /// ).map_err(|e| {
897 /// eprintln!("Failed to subscribe filtered observer: {}", e);
898 /// e
899 /// })?;
900 ///
901 /// property.set(10).map_err(|e| {
902 /// eprintln!("Failed to set value: {}", e);
903 /// e
904 /// })?; // Triggers observer (0 -> 10)
905 /// property.set(5).map_err(|e| {
906 /// eprintln!("Failed to set value: {}", e);
907 /// e
908 /// })?; // Does NOT trigger observer (10 -> 5)
909 /// property.set(15).map_err(|e| {
910 /// eprintln!("Failed to set value: {}", e);
911 /// e
912 /// })?; // Triggers observer (5 -> 15)
913 /// # Ok::<(), observable_property::PropertyError>(())
914 /// ```
915 pub fn subscribe_filtered<F>(
916 &self,
917 observer: Observer<T>,
918 filter: F,
919 ) -> Result<ObserverId, PropertyError>
920 where
921 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
922 {
923 let filter = Arc::new(filter);
924 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
925 if filter(old_val, new_val) {
926 observer(old_val, new_val);
927 }
928 });
929
930 self.subscribe(filtered_observer)
931 }
932
933 pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
934 let prop = self
935 .inner
936 .read()
937 .map_err(|_| PropertyError::ReadLockError {
938 context: "notifying observers".to_string(),
939 })?;
940
941 for (old_val, new_val) in changes {
942 for observer in prop.observers.values() {
943 observer(&old_val, &new_val);
944 }
945 }
946 Ok(())
947 }
948
949 /// Subscribes an observer and returns a RAII guard for automatic cleanup
950 ///
951 /// This method is similar to `subscribe()` but returns a `Subscription` object
952 /// that automatically removes the observer when it goes out of scope. This
953 /// provides a more convenient and safer alternative to manual subscription
954 /// management.
955 ///
956 /// # Arguments
957 ///
958 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
959 ///
960 /// # Returns
961 ///
962 /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
963 /// or `Err(PropertyError)` if the lock is poisoned.
964 ///
965 /// # Examples
966 ///
967 /// ## Basic RAII Subscription
968 ///
969 /// ```rust
970 /// use observable_property::ObservableProperty;
971 /// use std::sync::Arc;
972 ///
973 /// # fn main() -> Result<(), observable_property::PropertyError> {
974 /// let property = ObservableProperty::new(0);
975 ///
976 /// {
977 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
978 /// println!("Value: {} -> {}", old, new);
979 /// }))?;
980 ///
981 /// property.set(42)?; // Prints: "Value: 0 -> 42"
982 /// property.set(100)?; // Prints: "Value: 42 -> 100"
983 ///
984 /// // Automatic cleanup when _subscription goes out of scope
985 /// }
986 ///
987 /// property.set(200)?; // No output - subscription was cleaned up
988 /// # Ok(())
989 /// # }
990 /// ```
991 ///
992 /// ## Comparison with Manual Management
993 ///
994 /// ```rust
995 /// use observable_property::ObservableProperty;
996 /// use std::sync::Arc;
997 ///
998 /// # fn main() -> Result<(), observable_property::PropertyError> {
999 /// let property = ObservableProperty::new("initial".to_string());
1000 ///
1001 /// // Method 1: Manual subscription management (traditional approach)
1002 /// let observer_id = property.subscribe(Arc::new(|old, new| {
1003 /// println!("Manual: {} -> {}", old, new);
1004 /// }))?;
1005 ///
1006 /// // Method 2: RAII subscription management (recommended)
1007 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1008 /// println!("RAII: {} -> {}", old, new);
1009 /// }))?;
1010 ///
1011 /// // Both observers will be called
1012 /// property.set("changed".to_string())?;
1013 /// // Prints:
1014 /// // "Manual: initial -> changed"
1015 /// // "RAII: initial -> changed"
1016 ///
1017 /// // Manual cleanup required for first observer
1018 /// property.unsubscribe(observer_id)?;
1019 ///
1020 /// // Second observer (_subscription) is automatically cleaned up when
1021 /// // the variable goes out of scope - no manual intervention needed
1022 /// # Ok(())
1023 /// # }
1024 /// ```
1025 ///
1026 /// ## Error Handling with Early Returns
1027 ///
1028 /// ```rust
1029 /// use observable_property::ObservableProperty;
1030 /// use std::sync::Arc;
1031 ///
1032 /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1033 /// let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1034 /// println!("Processing: {} -> {}", old, new);
1035 /// }))?;
1036 ///
1037 /// property.set(1)?;
1038 ///
1039 /// if property.get()? > 0 {
1040 /// return Ok(()); // Subscription automatically cleaned up on early return
1041 /// }
1042 ///
1043 /// property.set(2)?;
1044 /// Ok(()) // Subscription automatically cleaned up on normal return
1045 /// }
1046 ///
1047 /// # fn main() -> Result<(), observable_property::PropertyError> {
1048 /// let property = ObservableProperty::new(0);
1049 /// process_with_monitoring(&property)?; // Monitoring active only during function call
1050 /// property.set(99)?; // No monitoring output - subscription was cleaned up
1051 /// # Ok(())
1052 /// # }
1053 /// ```
1054 ///
1055 /// ## Multi-threaded Subscription Management
1056 ///
1057 /// ```rust
1058 /// use observable_property::ObservableProperty;
1059 /// use std::sync::Arc;
1060 /// use std::thread;
1061 ///
1062 /// # fn main() -> Result<(), observable_property::PropertyError> {
1063 /// let property = Arc::new(ObservableProperty::new(0));
1064 /// let property_clone = property.clone();
1065 ///
1066 /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1067 /// let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1068 /// println!("Thread observer: {} -> {}", old, new);
1069 /// }))?;
1070 ///
1071 /// property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1072 ///
1073 /// // Subscription automatically cleaned up when thread ends
1074 /// Ok(())
1075 /// });
1076 ///
1077 /// handle.join().unwrap()?;
1078 /// property.set(100)?; // No output - thread subscription was cleaned up
1079 /// # Ok(())
1080 /// # }
1081 /// ```
1082 ///
1083 /// # Use Cases
1084 ///
1085 /// This method is particularly useful in scenarios such as:
1086 /// - Temporary observers that should be active only during a specific scope
1087 /// - Error-prone code where manual cleanup might be forgotten
1088 /// - Complex control flow where multiple exit points make manual cleanup difficult
1089 /// - Resource-constrained environments where observer leaks are problematic
1090 pub fn subscribe_with_subscription(
1091 &self,
1092 observer: Observer<T>,
1093 ) -> Result<Subscription<T>, PropertyError> {
1094 let id = self.subscribe(observer)?;
1095 Ok(Subscription {
1096 inner: Arc::clone(&self.inner),
1097 id,
1098 })
1099 }
1100
1101 /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1102 ///
1103 /// This method combines the functionality of `subscribe_filtered()` with the automatic
1104 /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1105 /// called when the filter condition is satisfied, and it will be automatically
1106 /// unsubscribed when the returned `Subscription` goes out of scope.
1107 ///
1108 /// # Arguments
1109 ///
1110 /// * `observer` - The observer function to call when the filter passes
1111 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1112 ///
1113 /// # Returns
1114 ///
1115 /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1116 /// or `Err(PropertyError)` if the lock is poisoned.
1117 ///
1118 /// # Examples
1119 ///
1120 /// ## Basic Filtered RAII Subscription
1121 ///
1122 /// ```rust
1123 /// use observable_property::ObservableProperty;
1124 /// use std::sync::Arc;
1125 ///
1126 /// # fn main() -> Result<(), observable_property::PropertyError> {
1127 /// let counter = ObservableProperty::new(0);
1128 ///
1129 /// {
1130 /// // Monitor only increases with automatic cleanup
1131 /// let _increase_monitor = counter.subscribe_filtered_with_subscription(
1132 /// Arc::new(|old, new| {
1133 /// println!("Counter increased: {} -> {}", old, new);
1134 /// }),
1135 /// |old, new| new > old
1136 /// )?;
1137 ///
1138 /// counter.set(5)?; // Prints: "Counter increased: 0 -> 5"
1139 /// counter.set(3)?; // No output (decrease)
1140 /// counter.set(7)?; // Prints: "Counter increased: 3 -> 7"
1141 ///
1142 /// // Subscription automatically cleaned up when leaving scope
1143 /// }
1144 ///
1145 /// counter.set(10)?; // No output - subscription was cleaned up
1146 /// # Ok(())
1147 /// # }
1148 /// ```
1149 ///
1150 /// ## Multi-Condition Temperature Monitoring
1151 ///
1152 /// ```rust
1153 /// use observable_property::ObservableProperty;
1154 /// use std::sync::Arc;
1155 ///
1156 /// # fn main() -> Result<(), observable_property::PropertyError> {
1157 /// let temperature = ObservableProperty::new(20.0_f64);
1158 ///
1159 /// {
1160 /// // Create filtered subscription that only triggers for significant temperature increases
1161 /// let _heat_warning = temperature.subscribe_filtered_with_subscription(
1162 /// Arc::new(|old_temp, new_temp| {
1163 /// println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1164 /// old_temp, new_temp);
1165 /// }),
1166 /// |old, new| new > old && (new - old) > 5.0 // Only trigger for increases > 5°C
1167 /// )?;
1168 ///
1169 /// // Create another filtered subscription for cooling alerts
1170 /// let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1171 /// Arc::new(|old_temp, new_temp| {
1172 /// println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1173 /// old_temp, new_temp);
1174 /// }),
1175 /// |old, new| new < old && (old - new) > 3.0 // Only trigger for decreases > 3°C
1176 /// )?;
1177 ///
1178 /// // Test the filters
1179 /// temperature.set(22.0)?; // No alerts (increase of only 2°C)
1180 /// temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1181 /// temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1182 ///
1183 /// // Both subscriptions are automatically cleaned up when they go out of scope
1184 /// }
1185 ///
1186 /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1187 /// # Ok(())
1188 /// # }
1189 /// ```
1190 ///
1191 /// ## Conditional Monitoring with Complex Filters
1192 ///
1193 /// ```rust
1194 /// use observable_property::ObservableProperty;
1195 /// use std::sync::Arc;
1196 ///
1197 /// # fn main() -> Result<(), observable_property::PropertyError> {
1198 /// let stock_price = ObservableProperty::new(100.0_f64);
1199 ///
1200 /// {
1201 /// // Monitor significant price movements (> 5% change)
1202 /// let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1203 /// Arc::new(|old_price, new_price| {
1204 /// let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1205 /// println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1206 /// old_price, new_price, change_percent);
1207 /// }),
1208 /// |old, new| {
1209 /// let change_percent = ((new - old) / old * 100.0).abs();
1210 /// change_percent > 5.0 // Trigger on > 5% change
1211 /// }
1212 /// )?;
1213 ///
1214 /// stock_price.set(103.0)?; // No alert (3% change)
1215 /// stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1216 /// stock_price.set(95.0)?; // Alert triggered (12% decrease)
1217 ///
1218 /// // Subscription automatically cleaned up when leaving scope
1219 /// }
1220 ///
1221 /// stock_price.set(200.0)?; // No alert - monitoring ended
1222 /// # Ok(())
1223 /// # }
1224 /// ```
1225 ///
1226 /// ## Cross-Thread Filtered Monitoring
1227 ///
1228 /// ```rust
1229 /// use observable_property::ObservableProperty;
1230 /// use std::sync::Arc;
1231 /// use std::thread;
1232 /// use std::time::Duration;
1233 ///
1234 /// # fn main() -> Result<(), observable_property::PropertyError> {
1235 /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1236 /// let latency_clone = network_latency.clone();
1237 ///
1238 /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1239 /// // Monitor high latency in background thread with automatic cleanup
1240 /// let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1241 /// Arc::new(|old_ms, new_ms| {
1242 /// println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1243 /// }),
1244 /// |_, new| *new > 100 // Alert when latency exceeds 100ms
1245 /// )?;
1246 ///
1247 /// // Simulate monitoring for a short time
1248 /// thread::sleep(Duration::from_millis(10));
1249 ///
1250 /// // Subscription automatically cleaned up when thread ends
1251 /// Ok(())
1252 /// });
1253 ///
1254 /// // Simulate network conditions
1255 /// network_latency.set(80)?; // No alert (under threshold)
1256 /// network_latency.set(150)?; // Alert triggered in background thread
1257 ///
1258 /// monitor_handle.join().unwrap()?;
1259 /// network_latency.set(200)?; // No alert - background monitoring ended
1260 /// # Ok(())
1261 /// # }
1262 /// ```
1263 ///
1264 /// # Use Cases
1265 ///
1266 /// This method is ideal for:
1267 /// - Threshold-based monitoring with automatic cleanup
1268 /// - Temporary conditional observers in specific code blocks
1269 /// - Event-driven systems where observers should be active only during certain phases
1270 /// - Resource management scenarios where filtered observers have limited lifetimes
1271 ///
1272 /// # Performance Notes
1273 ///
1274 /// The filter function is evaluated for every property change, so it should be
1275 /// lightweight. Complex filtering logic should be optimized to avoid performance
1276 /// bottlenecks, especially in high-frequency update scenarios.
1277 pub fn subscribe_filtered_with_subscription<F>(
1278 &self,
1279 observer: Observer<T>,
1280 filter: F,
1281 ) -> Result<Subscription<T>, PropertyError>
1282 where
1283 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1284 {
1285 let id = self.subscribe_filtered(observer, filter)?;
1286 Ok(Subscription {
1287 inner: Arc::clone(&self.inner),
1288 id,
1289 })
1290 }
1291}
1292
1293impl<T: Clone> Clone for ObservableProperty<T> {
1294 /// Creates a new reference to the same observable property
1295 ///
1296 /// This creates a new `ObservableProperty` instance that shares the same
1297 /// underlying data with the original. Changes made through either instance
1298 /// will be visible to observers subscribed through both instances.
1299 ///
1300 /// # Examples
1301 ///
1302 /// ```rust
1303 /// use observable_property::ObservableProperty;
1304 /// use std::sync::Arc;
1305 ///
1306 /// let property1 = ObservableProperty::new(42);
1307 /// let property2 = property1.clone();
1308 ///
1309 /// property2.subscribe(Arc::new(|old, new| {
1310 /// println!("Observer on property2 saw change: {} -> {}", old, new);
1311 /// })).map_err(|e| {
1312 /// eprintln!("Failed to subscribe: {}", e);
1313 /// e
1314 /// })?;
1315 ///
1316 /// // This change through property1 will trigger the observer on property2
1317 /// property1.set(100).map_err(|e| {
1318 /// eprintln!("Failed to set value: {}", e);
1319 /// e
1320 /// })?;
1321 /// # Ok::<(), observable_property::PropertyError>(())
1322 /// ```
1323 fn clone(&self) -> Self {
1324 Self {
1325 inner: Arc::clone(&self.inner),
1326 max_threads: self.max_threads,
1327 }
1328 }
1329}
1330
1331impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1332 /// Debug implementation that shows the current value if accessible
1333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334 match self.get() {
1335 Ok(value) => f
1336 .debug_struct("ObservableProperty")
1337 .field("value", &value)
1338 .field("observers_count", &"[hidden]")
1339 .field("max_threads", &self.max_threads)
1340 .finish(),
1341 Err(_) => f
1342 .debug_struct("ObservableProperty")
1343 .field("value", &"[inaccessible]")
1344 .field("observers_count", &"[hidden]")
1345 .finish(),
1346 }
1347 }
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352 use super::*;
1353 use std::sync::atomic::{AtomicUsize, Ordering};
1354 use std::time::Duration;
1355
1356 #[test]
1357 fn test_property_creation_and_basic_operations() {
1358 let prop = ObservableProperty::new(42);
1359
1360 // Test initial value
1361 match prop.get() {
1362 Ok(value) => assert_eq!(value, 42),
1363 Err(e) => panic!("Failed to get initial value: {}", e),
1364 }
1365
1366 // Test setting value
1367 if let Err(e) = prop.set(100) {
1368 panic!("Failed to set value: {}", e);
1369 }
1370
1371 match prop.get() {
1372 Ok(value) => assert_eq!(value, 100),
1373 Err(e) => panic!("Failed to get updated value: {}", e),
1374 }
1375 }
1376
1377 #[test]
1378 fn test_observer_subscription_and_notification() {
1379 let prop = ObservableProperty::new("initial".to_string());
1380 let notification_count = Arc::new(AtomicUsize::new(0));
1381 let last_old_value = Arc::new(RwLock::new(String::new()));
1382 let last_new_value = Arc::new(RwLock::new(String::new()));
1383
1384 let count_clone = notification_count.clone();
1385 let old_clone = last_old_value.clone();
1386 let new_clone = last_new_value.clone();
1387
1388 let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1389 count_clone.fetch_add(1, Ordering::SeqCst);
1390 if let Ok(mut old_val) = old_clone.write() {
1391 *old_val = old.clone();
1392 }
1393 if let Ok(mut new_val) = new_clone.write() {
1394 *new_val = new.clone();
1395 }
1396 })) {
1397 Ok(id) => id,
1398 Err(e) => panic!("Failed to subscribe observer: {}", e),
1399 };
1400
1401 // Change value and verify notification
1402 if let Err(e) = prop.set("changed".to_string()) {
1403 panic!("Failed to set property value: {}", e);
1404 }
1405
1406 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1407
1408 match last_old_value.read() {
1409 Ok(old_val) => assert_eq!(*old_val, "initial"),
1410 Err(e) => panic!("Failed to read old value: {:?}", e),
1411 }
1412
1413 match last_new_value.read() {
1414 Ok(new_val) => assert_eq!(*new_val, "changed"),
1415 Err(e) => panic!("Failed to read new value: {:?}", e),
1416 }
1417
1418 // Test unsubscription
1419 match prop.unsubscribe(observer_id) {
1420 Ok(was_present) => assert!(was_present),
1421 Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1422 }
1423
1424 // Change value again - should not notify
1425 if let Err(e) = prop.set("not_notified".to_string()) {
1426 panic!("Failed to set property value after unsubscribe: {}", e);
1427 }
1428 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1429 }
1430
1431 #[test]
1432 fn test_filtered_observer() {
1433 let prop = ObservableProperty::new(0i32);
1434 let notification_count = Arc::new(AtomicUsize::new(0));
1435 let count_clone = notification_count.clone();
1436
1437 // Observer only triggered when value increases
1438 let observer_id = match prop.subscribe_filtered(
1439 Arc::new(move |_, _| {
1440 count_clone.fetch_add(1, Ordering::SeqCst);
1441 }),
1442 |old, new| new > old,
1443 ) {
1444 Ok(id) => id,
1445 Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1446 };
1447
1448 // Should trigger (0 -> 5)
1449 if let Err(e) = prop.set(5) {
1450 panic!("Failed to set property value to 5: {}", e);
1451 }
1452 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1453
1454 // Should NOT trigger (5 -> 3)
1455 if let Err(e) = prop.set(3) {
1456 panic!("Failed to set property value to 3: {}", e);
1457 }
1458 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1459
1460 // Should trigger (3 -> 10)
1461 if let Err(e) = prop.set(10) {
1462 panic!("Failed to set property value to 10: {}", e);
1463 }
1464 assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1465
1466 match prop.unsubscribe(observer_id) {
1467 Ok(_) => {}
1468 Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
1469 }
1470 }
1471
1472 #[test]
1473 fn test_thread_safety_concurrent_reads() {
1474 let prop = Arc::new(ObservableProperty::new(42i32));
1475 let num_threads = 10;
1476 let reads_per_thread = 100;
1477
1478 let handles: Vec<_> = (0..num_threads)
1479 .map(|_| {
1480 let prop_clone = prop.clone();
1481 thread::spawn(move || {
1482 for _ in 0..reads_per_thread {
1483 match prop_clone.get() {
1484 Ok(value) => assert_eq!(value, 42),
1485 Err(e) => panic!("Failed to read property value: {}", e),
1486 }
1487 thread::sleep(Duration::from_millis(1));
1488 }
1489 })
1490 })
1491 .collect();
1492
1493 for handle in handles {
1494 if let Err(e) = handle.join() {
1495 panic!("Thread failed to complete: {:?}", e);
1496 }
1497 }
1498 }
1499
1500 #[test]
1501 fn test_async_set_performance() {
1502 let prop = ObservableProperty::new(0i32);
1503 let slow_observer_count = Arc::new(AtomicUsize::new(0));
1504 let count_clone = slow_observer_count.clone();
1505
1506 // Add observer that simulates slow work
1507 let _id = match prop.subscribe(Arc::new(move |_, _| {
1508 thread::sleep(Duration::from_millis(50));
1509 count_clone.fetch_add(1, Ordering::SeqCst);
1510 })) {
1511 Ok(id) => id,
1512 Err(e) => panic!("Failed to subscribe slow observer: {}", e),
1513 };
1514
1515 // Test synchronous set (should be slow)
1516 let start = std::time::Instant::now();
1517 if let Err(e) = prop.set(1) {
1518 panic!("Failed to set property value synchronously: {}", e);
1519 }
1520 let sync_duration = start.elapsed();
1521
1522 // Test asynchronous set (should be fast)
1523 let start = std::time::Instant::now();
1524 if let Err(e) = prop.set_async(2) {
1525 panic!("Failed to set property value asynchronously: {}", e);
1526 }
1527 let async_duration = start.elapsed();
1528
1529 // Async should be much faster than sync
1530 assert!(async_duration < sync_duration);
1531 assert!(async_duration.as_millis() < 10); // Should be very fast
1532
1533 // Wait for async observer to complete
1534 thread::sleep(Duration::from_millis(100));
1535
1536 // Both observers should have been called
1537 assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
1538 }
1539
1540 #[test]
1541 fn test_lock_poisoning() {
1542 // Create a property that we'll poison
1543 let prop = Arc::new(ObservableProperty::new(0));
1544 let prop_clone = prop.clone();
1545
1546 // Create a thread that will deliberately poison the lock
1547 let poison_thread = thread::spawn(move || {
1548 // Get write lock and then panic, which will poison the lock
1549 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1550 panic!("Deliberate panic to poison the lock");
1551 });
1552
1553 // Wait for the thread to complete (it will panic)
1554 let _ = poison_thread.join();
1555
1556 // Now the lock should be poisoned, verify all operations return appropriate errors
1557 match prop.get() {
1558 Ok(_) => panic!("get() should fail on a poisoned lock"),
1559 Err(e) => match e {
1560 PropertyError::PoisonedLock => {} // Expected error
1561 _ => panic!("Expected PoisonedLock error, got: {:?}", e),
1562 },
1563 }
1564
1565 match prop.set(42) {
1566 Ok(_) => panic!("set() should fail on a poisoned lock"),
1567 Err(e) => match e {
1568 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1569 _ => panic!("Expected lock-related error, got: {:?}", e),
1570 },
1571 }
1572
1573 match prop.subscribe(Arc::new(|_, _| {})) {
1574 Ok(_) => panic!("subscribe() should fail on a poisoned lock"),
1575 Err(e) => match e {
1576 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1577 _ => panic!("Expected lock-related error, got: {:?}", e),
1578 },
1579 }
1580 }
1581
1582 #[test]
1583 fn test_observer_panic_isolation() {
1584 let prop = ObservableProperty::new(0);
1585 let call_counts = Arc::new(AtomicUsize::new(0));
1586
1587 // First observer will panic
1588 let panic_observer_id = prop
1589 .subscribe(Arc::new(|_, _| {
1590 panic!("This observer deliberately panics");
1591 }))
1592 .expect("Failed to subscribe panic observer");
1593
1594 // Second observer should still be called despite first one panicking
1595 let counts = call_counts.clone();
1596 let normal_observer_id = prop
1597 .subscribe(Arc::new(move |_, _| {
1598 counts.fetch_add(1, Ordering::SeqCst);
1599 }))
1600 .expect("Failed to subscribe normal observer");
1601
1602 // Trigger the observers - this shouldn't panic despite the first observer panicking
1603 prop.set(42).expect("Failed to set property value");
1604
1605 // Verify the second observer was still called
1606 assert_eq!(call_counts.load(Ordering::SeqCst), 1);
1607
1608 // Clean up
1609 prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
1610 prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
1611 }
1612
1613 #[test]
1614 fn test_unsubscribe_nonexistent_observer() {
1615 let property = ObservableProperty::new(0);
1616
1617 // Generate a valid observer ID
1618 let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
1619
1620 // Create an ID that doesn't exist (valid_id + 1000 should not exist)
1621 let nonexistent_id = valid_id + 1000;
1622
1623 // Test unsubscribing a nonexistent observer
1624 match property.unsubscribe(nonexistent_id) {
1625 Ok(was_present) => {
1626 assert!(
1627 !was_present,
1628 "Unsubscribe should return false for nonexistent ID"
1629 );
1630 }
1631 Err(e) => panic!("Unsubscribe returned error: {:?}", e),
1632 }
1633
1634 // Also verify that unsubscribing twice returns false the second time
1635 property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
1636
1637 let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
1638 assert!(!result, "Second unsubscribe should return false");
1639 }
1640
1641 #[test]
1642 fn test_observer_id_wraparound() {
1643 let prop = ObservableProperty::new(0);
1644
1645 // Test that observer IDs increment properly and don't wrap around unexpectedly
1646 let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
1647 let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
1648 let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
1649
1650 assert!(id2 > id1, "Observer IDs should increment");
1651 assert!(id3 > id2, "Observer IDs should continue incrementing");
1652 assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
1653 assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
1654
1655 // Clean up
1656 prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
1657 prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
1658 prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
1659 }
1660
1661 #[test]
1662 fn test_concurrent_subscribe_unsubscribe() {
1663 let prop = Arc::new(ObservableProperty::new(0));
1664 let num_threads = 8;
1665 let operations_per_thread = 100;
1666
1667 let handles: Vec<_> = (0..num_threads)
1668 .map(|thread_id| {
1669 let prop_clone = prop.clone();
1670 thread::spawn(move || {
1671 let mut local_ids = Vec::new();
1672
1673 for i in 0..operations_per_thread {
1674 // Subscribe an observer
1675 let observer_id = prop_clone
1676 .subscribe(Arc::new(move |old, new| {
1677 // Do some work to simulate real observer
1678 let _ = thread_id + i + old + new;
1679 }))
1680 .expect("Subscribe should succeed");
1681
1682 local_ids.push(observer_id);
1683
1684 // Occasionally unsubscribe some observers
1685 if i % 10 == 0 && !local_ids.is_empty() {
1686 let idx = i % local_ids.len();
1687 let id_to_remove = local_ids.remove(idx);
1688 prop_clone
1689 .unsubscribe(id_to_remove)
1690 .expect("Unsubscribe should succeed");
1691 }
1692 }
1693
1694 // Clean up remaining observers
1695 for id in local_ids {
1696 prop_clone
1697 .unsubscribe(id)
1698 .expect("Final cleanup should succeed");
1699 }
1700 })
1701 })
1702 .collect();
1703
1704 // Wait for all threads to complete
1705 for handle in handles {
1706 handle.join().expect("Thread should complete successfully");
1707 }
1708
1709 // Property should still be functional
1710 prop.set(42)
1711 .expect("Property should still work after concurrent operations");
1712 }
1713
1714 #[test]
1715 fn test_multiple_observer_panics_isolation() {
1716 let prop = ObservableProperty::new(0);
1717 let successful_calls = Arc::new(AtomicUsize::new(0));
1718
1719 // Create multiple observers that will panic
1720 let _panic_id1 = prop
1721 .subscribe(Arc::new(|_, _| {
1722 panic!("First panic observer");
1723 }))
1724 .expect("Failed to subscribe first panic observer");
1725
1726 let _panic_id2 = prop
1727 .subscribe(Arc::new(|_, _| {
1728 panic!("Second panic observer");
1729 }))
1730 .expect("Failed to subscribe second panic observer");
1731
1732 // Create observers that should succeed despite the panics
1733 let count1 = successful_calls.clone();
1734 let _success_id1 = prop
1735 .subscribe(Arc::new(move |_, _| {
1736 count1.fetch_add(1, Ordering::SeqCst);
1737 }))
1738 .expect("Failed to subscribe first success observer");
1739
1740 let count2 = successful_calls.clone();
1741 let _success_id2 = prop
1742 .subscribe(Arc::new(move |_, _| {
1743 count2.fetch_add(1, Ordering::SeqCst);
1744 }))
1745 .expect("Failed to subscribe second success observer");
1746
1747 // Trigger all observers
1748 prop.set(42).expect("Failed to set property value for panic isolation test");
1749
1750 // Both successful observers should have been called despite the panics
1751 assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
1752 }
1753
1754 #[test]
1755 fn test_async_observer_panic_isolation() {
1756 let prop = ObservableProperty::new(0);
1757 let successful_calls = Arc::new(AtomicUsize::new(0));
1758
1759 // Create observer that will panic
1760 let _panic_id = prop
1761 .subscribe(Arc::new(|_, _| {
1762 panic!("Async panic observer");
1763 }))
1764 .expect("Failed to subscribe async panic observer");
1765
1766 // Create observer that should succeed
1767 let count = successful_calls.clone();
1768 let _success_id = prop
1769 .subscribe(Arc::new(move |_, _| {
1770 count.fetch_add(1, Ordering::SeqCst);
1771 }))
1772 .expect("Failed to subscribe async success observer");
1773
1774 // Use async set to trigger observers in background threads
1775 prop.set_async(42).expect("Failed to set property value asynchronously");
1776
1777 // Wait for async observers to complete
1778 thread::sleep(Duration::from_millis(100));
1779
1780 // The successful observer should have been called despite the panic
1781 assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
1782 }
1783
1784 #[test]
1785 fn test_very_large_observer_count() {
1786 let prop = ObservableProperty::new(0);
1787 let total_calls = Arc::new(AtomicUsize::new(0));
1788 let observer_count = 1000;
1789
1790 // Subscribe many observers
1791 let mut observer_ids = Vec::with_capacity(observer_count);
1792 for i in 0..observer_count {
1793 let count = total_calls.clone();
1794 let id = prop
1795 .subscribe(Arc::new(move |old, new| {
1796 count.fetch_add(1, Ordering::SeqCst);
1797 // Verify we got the right values
1798 assert_eq!(*old, 0);
1799 assert_eq!(*new, i + 1);
1800 }))
1801 .expect("Failed to subscribe large observer count test observer");
1802 observer_ids.push(id);
1803 }
1804
1805 // Trigger all observers
1806 prop.set(observer_count).expect("Failed to set property value for large observer count test");
1807
1808 // All observers should have been called
1809 assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
1810
1811 // Clean up
1812 for id in observer_ids {
1813 prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
1814 }
1815 }
1816
1817 #[test]
1818 fn test_observer_with_mutable_state() {
1819 let prop = ObservableProperty::new(0);
1820 let call_history = Arc::new(RwLock::new(Vec::new()));
1821
1822 let history = call_history.clone();
1823 let observer_id = prop
1824 .subscribe(Arc::new(move |old, new| {
1825 if let Ok(mut hist) = history.write() {
1826 hist.push((*old, *new));
1827 }
1828 }))
1829 .expect("Failed to subscribe mutable state observer");
1830
1831 // Make several changes
1832 prop.set(1).expect("Failed to set property to 1");
1833 prop.set(2).expect("Failed to set property to 2");
1834 prop.set(3).expect("Failed to set property to 3");
1835
1836 // Verify the history was recorded correctly
1837 let history = call_history.read().expect("Failed to read call history");
1838 assert_eq!(history.len(), 3);
1839 assert_eq!(history[0], (0, 1));
1840 assert_eq!(history[1], (1, 2));
1841 assert_eq!(history[2], (2, 3));
1842
1843 prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
1844 }
1845
1846 #[test]
1847 fn test_subscription_automatic_cleanup() {
1848 let prop = ObservableProperty::new(0);
1849 let call_count = Arc::new(AtomicUsize::new(0));
1850
1851 // Test that subscription automatically cleans up when dropped
1852 {
1853 let count = call_count.clone();
1854 let _subscription = prop
1855 .subscribe_with_subscription(Arc::new(move |_, _| {
1856 count.fetch_add(1, Ordering::SeqCst);
1857 }))
1858 .expect("Failed to create subscription for automatic cleanup test");
1859
1860 // Observer should be active while subscription is in scope
1861 prop.set(1).expect("Failed to set property value in subscription test");
1862 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1863
1864 // Subscription goes out of scope here and should auto-cleanup
1865 }
1866
1867 // Observer should no longer be active after subscription dropped
1868 prop.set(2).expect("Failed to set property value after subscription dropped");
1869 assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
1870 }
1871
1872 #[test]
1873 fn test_subscription_explicit_drop() {
1874 let prop = ObservableProperty::new(0);
1875 let call_count = Arc::new(AtomicUsize::new(0));
1876
1877 let count = call_count.clone();
1878 let subscription = prop
1879 .subscribe_with_subscription(Arc::new(move |_, _| {
1880 count.fetch_add(1, Ordering::SeqCst);
1881 }))
1882 .expect("Failed to create subscription for explicit drop test");
1883
1884 // Observer should be active
1885 prop.set(1).expect("Failed to set property value before explicit drop");
1886 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1887
1888 // Explicitly drop the subscription
1889 drop(subscription);
1890
1891 // Observer should no longer be active
1892 prop.set(2).expect("Failed to set property value after explicit drop");
1893 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1894 }
1895
1896 #[test]
1897 fn test_multiple_subscriptions_with_cleanup() {
1898 let prop = ObservableProperty::new(0);
1899 let call_count1 = Arc::new(AtomicUsize::new(0));
1900 let call_count2 = Arc::new(AtomicUsize::new(0));
1901 let call_count3 = Arc::new(AtomicUsize::new(0));
1902
1903 let count1 = call_count1.clone();
1904 let count2 = call_count2.clone();
1905 let count3 = call_count3.clone();
1906
1907 let subscription1 = prop
1908 .subscribe_with_subscription(Arc::new(move |_, _| {
1909 count1.fetch_add(1, Ordering::SeqCst);
1910 }))
1911 .expect("Failed to create first subscription");
1912
1913 let subscription2 = prop
1914 .subscribe_with_subscription(Arc::new(move |_, _| {
1915 count2.fetch_add(1, Ordering::SeqCst);
1916 }))
1917 .expect("Failed to create second subscription");
1918
1919 let subscription3 = prop
1920 .subscribe_with_subscription(Arc::new(move |_, _| {
1921 count3.fetch_add(1, Ordering::SeqCst);
1922 }))
1923 .expect("Failed to create third subscription");
1924
1925 // All observers should be active
1926 prop.set(1).expect("Failed to set property value with all subscriptions");
1927 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
1928 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
1929 assert_eq!(call_count3.load(Ordering::SeqCst), 1);
1930
1931 // Drop second subscription
1932 drop(subscription2);
1933
1934 // Only first and third should be active
1935 prop.set(2).expect("Failed to set property value with partial subscriptions");
1936 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
1937 assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
1938 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
1939
1940 // Drop remaining subscriptions
1941 drop(subscription1);
1942 drop(subscription3);
1943
1944 // No observers should be active
1945 prop.set(3).expect("Failed to set property value with no subscriptions");
1946 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
1947 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
1948 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
1949 }
1950
1951 #[test]
1952 fn test_subscription_drop_with_poisoned_lock() {
1953 let prop = Arc::new(ObservableProperty::new(0));
1954 let prop_clone = prop.clone();
1955
1956 // Create a subscription
1957 let call_count = Arc::new(AtomicUsize::new(0));
1958 let count = call_count.clone();
1959 let subscription = prop
1960 .subscribe_with_subscription(Arc::new(move |_, _| {
1961 count.fetch_add(1, Ordering::SeqCst);
1962 }))
1963 .expect("Failed to create subscription for poisoned lock test");
1964
1965 // Poison the lock by panicking while holding a write lock
1966 let poison_thread = thread::spawn(move || {
1967 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1968 panic!("Deliberate panic to poison the lock");
1969 });
1970 let _ = poison_thread.join(); // Ignore the panic result
1971
1972 // Dropping the subscription should not panic even with poisoned lock
1973 // This tests that the Drop implementation handles poisoned locks gracefully
1974 drop(subscription); // Should complete without panic
1975
1976 // Test passes if we reach here without panicking
1977 }
1978
1979 #[test]
1980 fn test_subscription_vs_manual_unsubscribe() {
1981 let prop = ObservableProperty::new(0);
1982 let auto_count = Arc::new(AtomicUsize::new(0));
1983 let manual_count = Arc::new(AtomicUsize::new(0));
1984
1985 // Manual subscription
1986 let manual_count_clone = manual_count.clone();
1987 let manual_id = prop
1988 .subscribe(Arc::new(move |_, _| {
1989 manual_count_clone.fetch_add(1, Ordering::SeqCst);
1990 }))
1991 .expect("Failed to create manual subscription");
1992
1993 // Automatic subscription
1994 let auto_count_clone = auto_count.clone();
1995 let _auto_subscription = prop
1996 .subscribe_with_subscription(Arc::new(move |_, _| {
1997 auto_count_clone.fetch_add(1, Ordering::SeqCst);
1998 }))
1999 .expect("Failed to create automatic subscription");
2000
2001 // Both should be active
2002 prop.set(1).expect("Failed to set property value with both subscriptions");
2003 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2004 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2005
2006 // Manual unsubscribe
2007 prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2008
2009 // Only automatic subscription should be active
2010 prop.set(2).expect("Failed to set property value after manual unsubscribe");
2011 assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2012 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2013
2014 // Auto subscription goes out of scope here and cleans up automatically
2015 }
2016
2017 #[test]
2018 fn test_subscribe_with_subscription_error_handling() {
2019 let prop = Arc::new(ObservableProperty::new(0));
2020 let prop_clone = prop.clone();
2021
2022 // Poison the lock
2023 let poison_thread = thread::spawn(move || {
2024 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2025 panic!("Deliberate panic to poison the lock");
2026 });
2027 let _ = poison_thread.join();
2028
2029 // subscribe_with_subscription should return an error for poisoned lock
2030 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2031 assert!(result.is_err());
2032 match result.expect_err("Expected error for poisoned lock") {
2033 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {
2034 // Either error type is acceptable for poisoned lock
2035 }
2036 other => panic!("Unexpected error type: {:?}", other),
2037 }
2038 }
2039
2040 #[test]
2041 fn test_subscription_with_property_cloning() {
2042 let prop1 = ObservableProperty::new(0);
2043 let prop2 = prop1.clone();
2044 let call_count = Arc::new(AtomicUsize::new(0));
2045
2046 // Subscribe to prop1
2047 let count = call_count.clone();
2048 let _subscription = prop1
2049 .subscribe_with_subscription(Arc::new(move |_, _| {
2050 count.fetch_add(1, Ordering::SeqCst);
2051 }))
2052 .expect("Failed to create subscription for cloned property test");
2053
2054 // Changes through prop2 should trigger the observer subscribed to prop1
2055 prop2.set(42).expect("Failed to set property value through prop2");
2056 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2057
2058 // Changes through prop1 should also trigger the observer
2059 prop1.set(100).expect("Failed to set property value through prop1");
2060 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2061 }
2062
2063 #[test]
2064 fn test_subscription_in_conditional_blocks() {
2065 let prop = ObservableProperty::new(0);
2066 let call_count = Arc::new(AtomicUsize::new(0));
2067
2068 let should_subscribe = true;
2069
2070 if should_subscribe {
2071 let count = call_count.clone();
2072 let _subscription = prop
2073 .subscribe_with_subscription(Arc::new(move |_, _| {
2074 count.fetch_add(1, Ordering::SeqCst);
2075 }))
2076 .expect("Failed to create subscription in conditional block");
2077
2078 // Observer active within this block
2079 prop.set(1).expect("Failed to set property value in conditional block");
2080 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2081
2082 // Subscription dropped when exiting this block
2083 }
2084
2085 // Observer should be inactive now
2086 prop.set(2).expect("Failed to set property value after conditional block");
2087 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2088 }
2089
2090 #[test]
2091 fn test_subscription_with_early_return() {
2092 fn test_function(
2093 prop: &ObservableProperty<i32>,
2094 should_return_early: bool,
2095 ) -> Result<(), PropertyError> {
2096 let call_count = Arc::new(AtomicUsize::new(0));
2097 let count = call_count.clone();
2098
2099 let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2100 count.fetch_add(1, Ordering::SeqCst);
2101 }))?;
2102
2103 prop.set(1)?;
2104 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2105
2106 if should_return_early {
2107 return Ok(()); // Subscription should be cleaned up here
2108 }
2109
2110 prop.set(2)?;
2111 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2112
2113 Ok(())
2114 // Subscription cleaned up when function exits normally
2115 }
2116
2117 let prop = ObservableProperty::new(0);
2118
2119 // Test early return
2120 test_function(&prop, true).expect("Failed to test early return");
2121
2122 // Verify observer is no longer active after early return
2123 prop.set(10).expect("Failed to set property value after early return");
2124
2125 // Test normal exit
2126 test_function(&prop, false).expect("Failed to test normal exit");
2127
2128 // Verify observer is no longer active after normal exit
2129 prop.set(20).expect("Failed to set property value after normal exit");
2130 }
2131
2132 #[test]
2133 fn test_subscription_move_semantics() {
2134 let prop = ObservableProperty::new(0);
2135 let call_count = Arc::new(AtomicUsize::new(0));
2136
2137 let count = call_count.clone();
2138 let subscription = prop
2139 .subscribe_with_subscription(Arc::new(move |_, _| {
2140 count.fetch_add(1, Ordering::SeqCst);
2141 }))
2142 .expect("Failed to create subscription for move semantics test");
2143
2144 // Observer should be active
2145 prop.set(1).expect("Failed to set property value before move");
2146 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2147
2148 // Move subscription to a new variable
2149 let moved_subscription = subscription;
2150
2151 // Observer should still be active after move
2152 prop.set(2).expect("Failed to set property value after move");
2153 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2154
2155 // Drop the moved subscription
2156 drop(moved_subscription);
2157
2158 // Observer should now be inactive
2159 prop.set(3).expect("Failed to set property value after moved subscription drop");
2160 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2161 }
2162
2163 #[test]
2164 fn test_filtered_subscription_automatic_cleanup() {
2165 let prop = ObservableProperty::new(0);
2166 let call_count = Arc::new(AtomicUsize::new(0));
2167
2168 {
2169 let count = call_count.clone();
2170 let _subscription = prop
2171 .subscribe_filtered_with_subscription(
2172 Arc::new(move |_, _| {
2173 count.fetch_add(1, Ordering::SeqCst);
2174 }),
2175 |old, new| new > old, // Only trigger on increases
2176 )
2177 .expect("Failed to create filtered subscription");
2178
2179 // Should trigger (0 -> 5)
2180 prop.set(5).expect("Failed to set property value to 5 in filtered test");
2181 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2182
2183 // Should NOT trigger (5 -> 3)
2184 prop.set(3).expect("Failed to set property value to 3 in filtered test");
2185 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2186
2187 // Should trigger (3 -> 10)
2188 prop.set(10).expect("Failed to set property value to 10 in filtered test");
2189 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2190
2191 // Subscription goes out of scope here
2192 }
2193
2194 // Observer should be inactive after subscription cleanup
2195 prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2196 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2197 }
2198
2199 #[test]
2200 fn test_multiple_filtered_subscriptions() {
2201 let prop = ObservableProperty::new(10);
2202 let increase_count = Arc::new(AtomicUsize::new(0));
2203 let decrease_count = Arc::new(AtomicUsize::new(0));
2204 let significant_change_count = Arc::new(AtomicUsize::new(0));
2205
2206 let inc_count = increase_count.clone();
2207 let dec_count = decrease_count.clone();
2208 let sig_count = significant_change_count.clone();
2209
2210 let _increase_sub = prop
2211 .subscribe_filtered_with_subscription(
2212 Arc::new(move |_, _| {
2213 inc_count.fetch_add(1, Ordering::SeqCst);
2214 }),
2215 |old, new| new > old,
2216 )
2217 .expect("Failed to create increase subscription");
2218
2219 let _decrease_sub = prop
2220 .subscribe_filtered_with_subscription(
2221 Arc::new(move |_, _| {
2222 dec_count.fetch_add(1, Ordering::SeqCst);
2223 }),
2224 |old, new| new < old,
2225 )
2226 .expect("Failed to create decrease subscription");
2227
2228 let _significant_sub = prop
2229 .subscribe_filtered_with_subscription(
2230 Arc::new(move |_, _| {
2231 sig_count.fetch_add(1, Ordering::SeqCst);
2232 }),
2233 |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2234 )
2235 .expect("Failed to create significant change subscription");
2236
2237 // Test increases
2238 prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2239 assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2240 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2241 assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2242
2243 // Test significant increase
2244 prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2245 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2246 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2247 assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2248
2249 // Test significant decrease
2250 prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2251 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2252 assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2253 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2254
2255 // Test small decrease
2256 prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2257 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2258 assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2259 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2260
2261 // All subscriptions auto-cleanup when they go out of scope
2262 }
2263
2264 #[test]
2265 fn test_filtered_subscription_complex_filter() {
2266 let prop = ObservableProperty::new(0.0f64);
2267 let call_count = Arc::new(AtomicUsize::new(0));
2268 let values_received = Arc::new(RwLock::new(Vec::new()));
2269
2270 let count = call_count.clone();
2271 let values = values_received.clone();
2272 let _subscription = prop
2273 .subscribe_filtered_with_subscription(
2274 Arc::new(move |old, new| {
2275 count.fetch_add(1, Ordering::SeqCst);
2276 if let Ok(mut v) = values.write() {
2277 v.push((*old, *new));
2278 }
2279 }),
2280 |old, new| {
2281 // Complex filter: trigger only when crossing integer boundaries
2282 // and the change is significant (> 0.5)
2283 let old_int = old.floor() as i32;
2284 let new_int = new.floor() as i32;
2285 old_int != new_int && (new - old).abs() > 0.5_f64
2286 },
2287 )
2288 .expect("Failed to create complex filtered subscription");
2289
2290 // Small changes within same integer - should not trigger
2291 prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2292 prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2293 assert_eq!(call_count.load(Ordering::SeqCst), 0);
2294
2295 // Cross integer boundary with significant change - should trigger
2296 prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2297 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2298
2299 // Small cross-boundary change - should not trigger
2300 prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2301 prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2302 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2303
2304 // Large cross-boundary change - should trigger
2305 prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2306 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2307
2308 // Verify received values
2309 let values = values_received.read().expect("Failed to read values in complex filter test");
2310 assert_eq!(values.len(), 2);
2311 assert_eq!(values[0], (0.7, 1.3));
2312 assert_eq!(values[1], (2.1, 3.5));
2313 }
2314
2315 #[test]
2316 fn test_filtered_subscription_error_handling() {
2317 let prop = Arc::new(ObservableProperty::new(0));
2318 let prop_clone = prop.clone();
2319
2320 // Poison the lock
2321 let poison_thread = thread::spawn(move || {
2322 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2323 panic!("Deliberate panic to poison the lock");
2324 });
2325 let _ = poison_thread.join();
2326
2327 // subscribe_filtered_with_subscription should return error for poisoned lock
2328 let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2329 assert!(result.is_err());
2330 }
2331
2332 #[test]
2333 fn test_filtered_subscription_vs_manual_filtered() {
2334 let prop = ObservableProperty::new(0);
2335 let auto_count = Arc::new(AtomicUsize::new(0));
2336 let manual_count = Arc::new(AtomicUsize::new(0));
2337
2338 // Manual filtered subscription
2339 let manual_count_clone = manual_count.clone();
2340 let manual_id = prop
2341 .subscribe_filtered(
2342 Arc::new(move |_, _| {
2343 manual_count_clone.fetch_add(1, Ordering::SeqCst);
2344 }),
2345 |old, new| new > old,
2346 )
2347 .expect("Failed to create manual filtered subscription");
2348
2349 // Automatic filtered subscription
2350 let auto_count_clone = auto_count.clone();
2351 let _auto_subscription = prop
2352 .subscribe_filtered_with_subscription(
2353 Arc::new(move |_, _| {
2354 auto_count_clone.fetch_add(1, Ordering::SeqCst);
2355 }),
2356 |old, new| new > old,
2357 )
2358 .expect("Failed to create automatic filtered subscription");
2359
2360 // Both should be triggered by increases
2361 prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2362 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2363 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2364
2365 // Neither should be triggered by decreases
2366 prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2367 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2368 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2369
2370 // Both should be triggered by increases again
2371 prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2372 assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2373 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2374
2375 // Manual cleanup
2376 prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2377
2378 // Only automatic subscription should be active
2379 prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2380 assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2381 assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2382
2383 // Auto subscription cleaned up when it goes out of scope
2384 }
2385
2386 #[test]
2387 fn test_filtered_subscription_with_panicking_filter() {
2388 let prop = ObservableProperty::new(0);
2389 let call_count = Arc::new(AtomicUsize::new(0));
2390
2391 let count = call_count.clone();
2392 let _subscription = prop
2393 .subscribe_filtered_with_subscription(
2394 Arc::new(move |_, _| {
2395 count.fetch_add(1, Ordering::SeqCst);
2396 }),
2397 |_, new| {
2398 if *new == 42 {
2399 panic!("Filter panic on 42");
2400 }
2401 true // Accept all other values
2402 },
2403 )
2404 .expect("Failed to create panicking filter subscription");
2405
2406 // Normal value should work
2407 prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2408 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2409
2410 // Value that causes filter to panic should be handled gracefully
2411 // The behavior here depends on how the filter panic is handled
2412 // In the current implementation, filter panics may cause the observer to not be called
2413 prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2414
2415 // Observer should still work for subsequent normal values
2416 prop.set(2).expect("Failed to set property to 2 after filter panic");
2417 // Note: The exact count here depends on panic handling implementation
2418 // The important thing is that the system doesn't crash
2419 }
2420
2421 #[test]
2422 fn test_subscription_thread_safety() {
2423 let prop = Arc::new(ObservableProperty::new(0));
2424 let num_threads = 8;
2425 let operations_per_thread = 50;
2426 let total_calls = Arc::new(AtomicUsize::new(0));
2427
2428 let handles: Vec<_> = (0..num_threads)
2429 .map(|thread_id| {
2430 let prop_clone = prop.clone();
2431 let calls_clone = total_calls.clone();
2432
2433 thread::spawn(move || {
2434 let mut local_subscriptions = Vec::new();
2435
2436 for i in 0..operations_per_thread {
2437 let calls = calls_clone.clone();
2438 let subscription = prop_clone
2439 .subscribe_with_subscription(Arc::new(move |old, new| {
2440 calls.fetch_add(1, Ordering::SeqCst);
2441 // Simulate some work
2442 let _ = thread_id + i + old + new;
2443 }))
2444 .expect("Should be able to create subscription");
2445
2446 local_subscriptions.push(subscription);
2447
2448 // Trigger observers
2449 prop_clone
2450 .set(thread_id * 1000 + i)
2451 .expect("Should be able to set value");
2452
2453 // Occasionally drop some subscriptions
2454 if i % 5 == 0 && !local_subscriptions.is_empty() {
2455 local_subscriptions.remove(0); // Drop first subscription
2456 }
2457 }
2458
2459 // All remaining subscriptions dropped when vector goes out of scope
2460 })
2461 })
2462 .collect();
2463
2464 // Wait for all threads to complete
2465 for handle in handles {
2466 handle.join().expect("Thread should complete successfully");
2467 }
2468
2469 // Property should still be functional after all the concurrent operations
2470 prop.set(9999).expect("Property should still work");
2471
2472 // We can't easily verify the exact call count due to the complex timing,
2473 // but we can verify that the system didn't crash and is still operational
2474 println!(
2475 "Total observer calls: {}",
2476 total_calls.load(Ordering::SeqCst)
2477 );
2478 }
2479
2480 #[test]
2481 fn test_subscription_cross_thread_drop() {
2482 let prop = Arc::new(ObservableProperty::new(0));
2483 let call_count = Arc::new(AtomicUsize::new(0));
2484
2485 // Create subscription in main thread
2486 let count = call_count.clone();
2487 let subscription = prop
2488 .subscribe_with_subscription(Arc::new(move |_, _| {
2489 count.fetch_add(1, Ordering::SeqCst);
2490 }))
2491 .expect("Failed to create subscription for cross-thread drop test");
2492
2493 // Verify observer is active
2494 prop.set(1).expect("Failed to set property value in cross-thread drop test");
2495 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2496
2497 // Move subscription to another thread and drop it there
2498 let prop_clone = prop.clone();
2499 let call_count_clone = call_count.clone();
2500
2501 let handle = thread::spawn(move || {
2502 // Verify observer is still active in the other thread
2503 prop_clone.set(2).expect("Failed to set property value in other thread");
2504 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
2505
2506 // Drop subscription in this thread
2507 drop(subscription);
2508
2509 // Verify observer is no longer active
2510 prop_clone.set(3).expect("Failed to set property value after drop in other thread");
2511 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
2512 });
2513
2514 handle.join().expect("Failed to join cross-thread drop test thread");
2515
2516 // Verify observer is still inactive in main thread
2517 prop.set(4).expect("Failed to set property value after thread join");
2518 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2519 }
2520
2521 #[test]
2522 fn test_concurrent_subscription_creation_and_property_changes() {
2523 let prop = Arc::new(ObservableProperty::new(0));
2524 let total_notifications = Arc::new(AtomicUsize::new(0));
2525 let num_subscriber_threads = 4;
2526 let num_setter_threads = 2;
2527 let operations_per_thread = 25;
2528
2529 // Threads that create and destroy subscriptions
2530 let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
2531 .map(|_| {
2532 let prop_clone = prop.clone();
2533 let notifications_clone = total_notifications.clone();
2534
2535 thread::spawn(move || {
2536 for _ in 0..operations_per_thread {
2537 let notifications = notifications_clone.clone();
2538 let _subscription = prop_clone
2539 .subscribe_with_subscription(Arc::new(move |_, _| {
2540 notifications.fetch_add(1, Ordering::SeqCst);
2541 }))
2542 .expect("Should create subscription");
2543
2544 // Keep subscription alive for a short time
2545 thread::sleep(Duration::from_millis(1));
2546
2547 // Subscription dropped when _subscription goes out of scope
2548 }
2549 })
2550 })
2551 .collect();
2552
2553 // Threads that continuously change the property value
2554 let setter_handles: Vec<_> = (0..num_setter_threads)
2555 .map(|thread_id| {
2556 let prop_clone = prop.clone();
2557
2558 thread::spawn(move || {
2559 for i in 0..operations_per_thread * 2 {
2560 prop_clone
2561 .set(thread_id * 10000 + i)
2562 .expect("Should set value");
2563 thread::sleep(Duration::from_millis(1));
2564 }
2565 })
2566 })
2567 .collect();
2568
2569 // Wait for all threads to complete
2570 for handle in subscriber_handles
2571 .into_iter()
2572 .chain(setter_handles.into_iter())
2573 {
2574 handle.join().expect("Thread should complete");
2575 }
2576
2577 // System should be stable after concurrent operations
2578 prop.set(99999).expect("Property should still work");
2579
2580 println!(
2581 "Total notifications during concurrent test: {}",
2582 total_notifications.load(Ordering::SeqCst)
2583 );
2584 }
2585
2586 #[test]
2587 fn test_filtered_subscription_thread_safety() {
2588 let prop = Arc::new(ObservableProperty::new(0));
2589 let increase_notifications = Arc::new(AtomicUsize::new(0));
2590 let decrease_notifications = Arc::new(AtomicUsize::new(0));
2591 let num_threads = 6;
2592
2593 let handles: Vec<_> = (0..num_threads)
2594 .map(|thread_id| {
2595 let prop_clone = prop.clone();
2596 let inc_notifications = increase_notifications.clone();
2597 let dec_notifications = decrease_notifications.clone();
2598
2599 thread::spawn(move || {
2600 // Create increase-only subscription
2601 let inc_count = inc_notifications.clone();
2602 let _inc_subscription = prop_clone
2603 .subscribe_filtered_with_subscription(
2604 Arc::new(move |_, _| {
2605 inc_count.fetch_add(1, Ordering::SeqCst);
2606 }),
2607 |old, new| new > old,
2608 )
2609 .expect("Should create filtered subscription");
2610
2611 // Create decrease-only subscription
2612 let dec_count = dec_notifications.clone();
2613 let _dec_subscription = prop_clone
2614 .subscribe_filtered_with_subscription(
2615 Arc::new(move |_, _| {
2616 dec_count.fetch_add(1, Ordering::SeqCst);
2617 }),
2618 |old, new| new < old,
2619 )
2620 .expect("Should create filtered subscription");
2621
2622 // Perform some property changes
2623 let base_value = thread_id * 100;
2624 for i in 0..20 {
2625 let new_value = base_value + (i % 10); // Creates increases and decreases
2626 prop_clone.set(new_value).expect("Should set value");
2627 thread::sleep(Duration::from_millis(1));
2628 }
2629
2630 // Subscriptions automatically cleaned up when going out of scope
2631 })
2632 })
2633 .collect();
2634
2635 // Wait for all threads
2636 for handle in handles {
2637 handle.join().expect("Thread should complete");
2638 }
2639
2640 // Verify system is still operational
2641 let initial_inc = increase_notifications.load(Ordering::SeqCst);
2642 let initial_dec = decrease_notifications.load(Ordering::SeqCst);
2643
2644 prop.set(1000).expect("Property should still work");
2645 prop.set(2000).expect("Property should still work");
2646
2647 // No new notifications should occur (all subscriptions cleaned up)
2648 assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
2649 assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
2650
2651 println!(
2652 "Increase notifications: {}, Decrease notifications: {}",
2653 initial_inc, initial_dec
2654 );
2655 }
2656
2657 #[test]
2658 fn test_subscription_with_async_property_changes() {
2659 let prop = Arc::new(ObservableProperty::new(0));
2660 let sync_notifications = Arc::new(AtomicUsize::new(0));
2661 let async_notifications = Arc::new(AtomicUsize::new(0));
2662
2663 // Subscription that tracks sync notifications
2664 let sync_count = sync_notifications.clone();
2665 let _sync_subscription = prop
2666 .subscribe_with_subscription(Arc::new(move |old, new| {
2667 sync_count.fetch_add(1, Ordering::SeqCst);
2668 // Simulate slow observer work
2669 thread::sleep(Duration::from_millis(5));
2670 println!("Sync observer: {} -> {}", old, new);
2671 }))
2672 .expect("Failed to create sync subscription");
2673
2674 // Subscription that tracks async notifications
2675 let async_count = async_notifications.clone();
2676 let _async_subscription = prop
2677 .subscribe_with_subscription(Arc::new(move |old, new| {
2678 async_count.fetch_add(1, Ordering::SeqCst);
2679 println!("Async observer: {} -> {}", old, new);
2680 }))
2681 .expect("Failed to create async subscription");
2682
2683 // Test sync property changes
2684 let start = std::time::Instant::now();
2685 prop.set(1).expect("Failed to set property value 1 in async test");
2686 prop.set(2).expect("Failed to set property value 2 in async test");
2687 let sync_duration = start.elapsed();
2688
2689 // Test async property changes
2690 let start = std::time::Instant::now();
2691 prop.set_async(3).expect("Failed to set property value 3 async");
2692 prop.set_async(4).expect("Failed to set property value 4 async");
2693 let async_duration = start.elapsed();
2694
2695 // Async should be much faster
2696 assert!(async_duration < sync_duration);
2697
2698 // Wait for async observers to complete
2699 thread::sleep(Duration::from_millis(50));
2700
2701 // All observers should have been called
2702 assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
2703 assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
2704
2705 // Subscriptions auto-cleanup when going out of scope
2706 }
2707
2708 #[test]
2709 fn test_subscription_creation_with_poisoned_lock() {
2710 let prop = Arc::new(ObservableProperty::new(0));
2711 let prop_clone = prop.clone();
2712
2713 // Create a valid subscription before poisoning
2714 let call_count = Arc::new(AtomicUsize::new(0));
2715 let count = call_count.clone();
2716 let existing_subscription = prop
2717 .subscribe_with_subscription(Arc::new(move |_, _| {
2718 count.fetch_add(1, Ordering::SeqCst);
2719 }))
2720 .expect("Failed to create subscription before poisoning");
2721
2722 // Poison the lock
2723 let poison_thread = thread::spawn(move || {
2724 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
2725 panic!("Deliberate panic to poison the lock");
2726 });
2727 let _ = poison_thread.join();
2728
2729 // New subscription creation should fail
2730 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2731 assert!(result.is_err());
2732
2733 // New filtered subscription creation should also fail
2734 let filtered_result =
2735 prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2736 assert!(filtered_result.is_err());
2737
2738 // Dropping existing subscription should not panic
2739 drop(existing_subscription);
2740 }
2741
2742 #[test]
2743 fn test_subscription_cleanup_behavior_with_poisoned_lock() {
2744 // This test specifically verifies that Drop doesn't panic with poisoned locks
2745 let prop = Arc::new(ObservableProperty::new(0));
2746 let call_count = Arc::new(AtomicUsize::new(0));
2747
2748 // Create subscription
2749 let count = call_count.clone();
2750 let subscription = prop
2751 .subscribe_with_subscription(Arc::new(move |_, _| {
2752 count.fetch_add(1, Ordering::SeqCst);
2753 }))
2754 .expect("Failed to create subscription for cleanup behavior test");
2755
2756 // Verify it works initially
2757 prop.set(1).expect("Failed to set property value in cleanup behavior test");
2758 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2759
2760 // Poison the lock from another thread
2761 let prop_clone = prop.clone();
2762 let poison_thread = thread::spawn(move || {
2763 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
2764 panic!("Deliberate panic to poison the lock");
2765 });
2766 let _ = poison_thread.join();
2767
2768 // Now drop the subscription - this should NOT panic
2769 // The Drop implementation should handle the poisoned lock gracefully
2770 drop(subscription);
2771
2772 // Test succeeds if we reach this point without panicking
2773 }
2774
2775 #[test]
2776 fn test_multiple_subscription_cleanup_with_poisoned_lock() {
2777 let prop = Arc::new(ObservableProperty::new(0));
2778 let mut subscriptions = Vec::new();
2779
2780 // Create multiple subscriptions
2781 for i in 0..5 {
2782 let call_count = Arc::new(AtomicUsize::new(0));
2783 let count = call_count.clone();
2784 let subscription = prop
2785 .subscribe_with_subscription(Arc::new(move |old, new| {
2786 count.fetch_add(1, Ordering::SeqCst);
2787 println!("Observer {}: {} -> {}", i, old, new);
2788 }))
2789 .expect("Failed to create subscription in multiple cleanup test");
2790 subscriptions.push(subscription);
2791 }
2792
2793 // Verify they all work
2794 prop.set(42).expect("Failed to set property value in multiple cleanup test");
2795
2796 // Poison the lock
2797 let prop_clone = prop.clone();
2798 let poison_thread = thread::spawn(move || {
2799 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
2800 panic!("Deliberate panic to poison the lock");
2801 });
2802 let _ = poison_thread.join();
2803
2804 // Drop all subscriptions - none should panic
2805 for subscription in subscriptions {
2806 drop(subscription);
2807 }
2808
2809 // Test succeeds if no panics occurred
2810 }
2811
2812 #[test]
2813 fn test_subscription_behavior_before_and_after_poison() {
2814 let prop = Arc::new(ObservableProperty::new(0));
2815 let before_poison_count = Arc::new(AtomicUsize::new(0));
2816 let after_poison_count = Arc::new(AtomicUsize::new(0));
2817
2818 // Create subscription before poisoning
2819 let before_count = before_poison_count.clone();
2820 let before_subscription = prop
2821 .subscribe_with_subscription(Arc::new(move |_, _| {
2822 before_count.fetch_add(1, Ordering::SeqCst);
2823 }))
2824 .expect("Failed to create subscription before poison test");
2825
2826 // Verify it works
2827 prop.set(1).expect("Failed to set property value before poison test");
2828 assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
2829
2830 // Poison the lock
2831 let prop_clone = prop.clone();
2832 let poison_thread = thread::spawn(move || {
2833 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
2834 panic!("Deliberate panic to poison the lock");
2835 });
2836 let _ = poison_thread.join();
2837
2838 // Try to create subscription after poisoning - should fail
2839 let after_count = after_poison_count.clone();
2840 let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2841 after_count.fetch_add(1, Ordering::SeqCst);
2842 }));
2843 assert!(after_result.is_err());
2844
2845 // Clean up the before-poison subscription - should not panic
2846 drop(before_subscription);
2847
2848 // Verify after-poison subscription was never created
2849 assert_eq!(after_poison_count.load(Ordering::SeqCst), 0);
2850 }
2851
2852 #[test]
2853 fn test_concurrent_subscription_drops_with_poison() {
2854 let prop = Arc::new(ObservableProperty::new(0));
2855 let num_subscriptions = 10;
2856 let mut subscriptions = Vec::new();
2857
2858 // Create multiple subscriptions
2859 for i in 0..num_subscriptions {
2860 let call_count = Arc::new(AtomicUsize::new(0));
2861 let count = call_count.clone();
2862 let subscription = prop
2863 .subscribe_with_subscription(Arc::new(move |_, _| {
2864 count.fetch_add(1, Ordering::SeqCst);
2865 println!("Observer {}", i);
2866 }))
2867 .expect("Failed to create subscription in concurrent drops test");
2868 subscriptions.push(subscription);
2869 }
2870
2871 // Poison the lock
2872 let prop_clone = prop.clone();
2873 let poison_thread = thread::spawn(move || {
2874 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
2875 panic!("Deliberate panic to poison the lock");
2876 });
2877 let _ = poison_thread.join();
2878
2879 // Drop subscriptions concurrently from multiple threads
2880 let handles: Vec<_> = subscriptions
2881 .into_iter()
2882 .enumerate()
2883 .map(|(i, subscription)| {
2884 thread::spawn(move || {
2885 // Add some randomness to timing
2886 thread::sleep(Duration::from_millis(i as u64 % 5));
2887 drop(subscription);
2888 println!("Dropped subscription {}", i);
2889 })
2890 })
2891 .collect();
2892
2893 // Wait for all drops to complete
2894 for handle in handles {
2895 handle
2896 .join()
2897 .expect("Drop thread should complete without panic");
2898 }
2899
2900 // Test succeeds if all threads completed successfully
2901 }
2902}