observable_property/lib.rs
1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
15//!
16//! ## Quick Start
17//!
18//! ```rust
19//! use observable_property::ObservableProperty;
20//! use std::sync::Arc;
21//!
22//! // Create an observable property
23//! let property = ObservableProperty::new(42);
24//!
25//! // Subscribe to changes
26//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
27//! println!("Value changed from {} to {}", old_value, new_value);
28//! })).map_err(|e| {
29//! eprintln!("Failed to subscribe: {}", e);
30//! e
31//! })?;
32//!
33//! // Change the value (triggers observer)
34//! property.set(100).map_err(|e| {
35//! eprintln!("Failed to set value: {}", e);
36//! e
37//! })?;
38//!
39//! // Unsubscribe when done
40//! property.unsubscribe(observer_id).map_err(|e| {
41//! eprintln!("Failed to unsubscribe: {}", e);
42//! e
43//! })?;
44//! # Ok::<(), observable_property::PropertyError>(())
45//! ```
46//!
47//! ## Multi-threading Example
48//!
49//! ```rust
50//! use observable_property::ObservableProperty;
51//! use std::sync::Arc;
52//! use std::thread;
53//!
54//! let property = Arc::new(ObservableProperty::new(0));
55//! let property_clone = property.clone();
56//!
57//! // Subscribe from one thread
58//! property.subscribe(Arc::new(|old, new| {
59//! println!("Value changed: {} -> {}", old, new);
60//! })).map_err(|e| {
61//! eprintln!("Failed to subscribe: {}", e);
62//! e
63//! })?;
64//!
65//! // Modify from another thread
66//! thread::spawn(move || {
67//! if let Err(e) = property_clone.set(42) {
68//! eprintln!("Failed to set value: {}", e);
69//! }
70//! }).join().expect("Thread panicked");
71//! # Ok::<(), observable_property::PropertyError>(())
72//! ```
73//!
74//! ## RAII Subscriptions (Recommended)
75//!
76//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
77//!
78//! ```rust
79//! use observable_property::ObservableProperty;
80//! use std::sync::Arc;
81//!
82//! # fn main() -> Result<(), observable_property::PropertyError> {
83//! let property = ObservableProperty::new(0);
84//!
85//! {
86//! // Create RAII subscription - automatically cleaned up when dropped
87//! let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
88//! println!("Value changed: {} -> {}", old, new);
89//! }))?;
90//!
91//! property.set(42)?; // Prints: "Value changed: 0 -> 42"
92//!
93//! // Subscription automatically unsubscribes when leaving this scope
94//! }
95//!
96//! // No observer active anymore
97//! property.set(100)?; // No output
98//! # Ok(())
99//! # }
100//! ```
101//!
102//! ## Filtered RAII Subscriptions
103//!
104//! Combine filtering with automatic cleanup for conditional monitoring:
105//!
106//! ```rust
107//! use observable_property::ObservableProperty;
108//! use std::sync::Arc;
109//!
110//! # fn main() -> Result<(), observable_property::PropertyError> {
111//! let temperature = ObservableProperty::new(20.0);
112//!
113//! {
114//! // Monitor only significant temperature increases with automatic cleanup
115//! let _heat_warning = temperature.subscribe_filtered_with_subscription(
116//! Arc::new(|old, new| {
117//! println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
118//! }),
119//! |old, new| new > old && (new - old) > 5.0
120//! )?;
121//!
122//! temperature.set(22.0)?; // No warning (only 2°C increase)
123//! temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
124//!
125//! // Subscription automatically cleaned up here
126//! }
127//!
128//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
129//! # Ok(())
130//! # }
131//! ```
132//!
133//! ## Subscription Management Comparison
134//!
135//! ```rust
136//! use observable_property::ObservableProperty;
137//! use std::sync::Arc;
138//!
139//! # fn main() -> Result<(), observable_property::PropertyError> {
140//! let property = ObservableProperty::new(0);
141//! let observer = Arc::new(|old: &i32, new: &i32| {
142//! println!("Value: {} -> {}", old, new);
143//! });
144//!
145//! // Method 1: Manual subscription management
146//! let observer_id = property.subscribe(observer.clone())?;
147//! property.set(42)?;
148//! property.unsubscribe(observer_id)?; // Manual cleanup required
149//!
150//! // Method 2: RAII subscription management (recommended)
151//! {
152//! let _subscription = property.subscribe_with_subscription(observer)?;
153//! property.set(100)?;
154//! // Automatic cleanup when _subscription goes out of scope
155//! }
156//! # Ok(())
157//! # }
158//! ```
159//!
160//! ## Advanced RAII Patterns
161//!
162//! Comprehensive example showing various RAII subscription patterns:
163//!
164//! ```rust
165//! use observable_property::ObservableProperty;
166//! use std::sync::Arc;
167//!
168//! # fn main() -> Result<(), observable_property::PropertyError> {
169//! // System monitoring example
170//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
171//! let memory_usage = ObservableProperty::new(1024); // MB
172//! let active_connections = ObservableProperty::new(0u32);
173//!
174//! // Conditional monitoring based on system state
175//! let high_load_monitoring = cpu_usage.get()? > 50.0;
176//!
177//! if high_load_monitoring {
178//! // Critical system monitoring - active only during high load
179//! let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
180//! Arc::new(|old, new| {
181//! println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
182//! }),
183//! |_, new| *new > 80.0
184//! )?;
185//!
186//! let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
187//! Arc::new(|old, new| {
188//! println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
189//! }),
190//! |_, new| *new > 8192 // > 8GB
191//! )?;
192//!
193//! // Simulate system load changes
194//! cpu_usage.set(85.0)?; // Would trigger critical alert
195//! memory_usage.set(9216)?; // Would trigger memory warning
196//!
197//! // All monitoring automatically stops when exiting this block
198//! }
199//!
200//! // Connection monitoring with scoped lifetime
201//! {
202//! let _connection_monitor = active_connections.subscribe_with_subscription(
203//! Arc::new(|old, new| {
204//! if new > old {
205//! println!("📈 New connections: {} -> {}", old, new);
206//! } else if new < old {
207//! println!("📉 Connections closed: {} -> {}", old, new);
208//! }
209//! })
210//! )?;
211//!
212//! active_connections.set(5)?; // Prints: "📈 New connections: 0 -> 5"
213//! active_connections.set(3)?; // Prints: "📉 Connections closed: 5 -> 3"
214//! active_connections.set(8)?; // Prints: "📈 New connections: 3 -> 8"
215//!
216//! // Connection monitoring automatically stops here
217//! }
218//!
219//! // No monitoring active anymore
220//! cpu_usage.set(95.0)?; // No output
221//! memory_usage.set(10240)?; // No output
222//! active_connections.set(15)?; // No output
223//!
224//! println!("All monitoring automatically cleaned up!");
225//! # Ok(())
226//! # }
227//! ```
228
229use std::collections::HashMap;
230use std::fmt;
231use std::panic;
232use std::sync::{Arc, RwLock};
233use std::thread;
234
235/// Maximum number of background threads used for asynchronous observer notifications
236///
237/// This constant controls the degree of parallelism when using `set_async()` to notify
238/// observers. The observer list is divided into batches, with each batch running in
239/// its own background thread, up to this maximum number of threads.
240///
241/// # Rationale
242///
243/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
244/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead
245/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
246/// - **System Responsiveness**: Limits thread contention on multi-core systems
247///
248/// # Implementation Details
249///
250/// When `set_async()` is called:
251/// 1. All observers are collected into a snapshot
252/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
253/// 3. Each batch executes in its own `thread::spawn()` call
254/// 4. Observers within each batch are executed sequentially
255///
256/// For example, with 100 observers and `MAX_THREADS = 4`:
257/// - Batch 1: Observers 1-25 (Thread 1)
258/// - Batch 2: Observers 26-50 (Thread 2)
259/// - Batch 3: Observers 51-75 (Thread 3)
260/// - Batch 4: Observers 76-100 (Thread 4)
261///
262/// # Tuning Considerations
263///
264/// This value can be adjusted based on your application's needs:
265/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
266/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
267/// - **Memory-constrained systems**: Lower values reduce thread overhead
268/// - **Real-time systems**: Lower values reduce scheduling unpredictability
269///
270/// # Thread Safety
271///
272/// This constant is used only during the batching calculation and does not affect
273/// the thread safety of the overall system.
274const MAX_THREADS: usize = 4;
275/// Errors that can occur when working with ObservableProperty
276#[derive(Debug, Clone)]
277pub enum PropertyError {
278 /// Failed to acquire a read lock on the property
279 ReadLockError {
280 /// Context describing what operation was being attempted
281 context: String,
282 },
283 /// Failed to acquire a write lock on the property
284 WriteLockError {
285 /// Context describing what operation was being attempted
286 context: String,
287 },
288 /// Attempted to unsubscribe an observer that doesn't exist
289 ObserverNotFound {
290 /// The ID of the observer that wasn't found
291 id: usize,
292 },
293 /// The property's lock has been poisoned due to a panic in another thread
294 PoisonedLock,
295 /// An observer function encountered an error during execution
296 ObserverError {
297 /// Description of what went wrong
298 reason: String,
299 },
300 /// The thread pool for async notifications is exhausted
301 ThreadPoolExhausted,
302 /// Invalid configuration was provided
303 InvalidConfiguration {
304 /// Description of the invalid configuration
305 reason: String,
306 },
307}
308
309impl fmt::Display for PropertyError {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 match self {
312 PropertyError::ReadLockError { context } => {
313 write!(f, "Failed to acquire read lock: {}", context)
314 }
315 PropertyError::WriteLockError { context } => {
316 write!(f, "Failed to acquire write lock: {}", context)
317 }
318 PropertyError::ObserverNotFound { id } => {
319 write!(f, "Observer with ID {} not found", id)
320 }
321 PropertyError::PoisonedLock => {
322 write!(
323 f,
324 "Property is in a poisoned state due to a panic in another thread"
325 )
326 }
327 PropertyError::ObserverError { reason } => {
328 write!(f, "Observer execution failed: {}", reason)
329 }
330 PropertyError::ThreadPoolExhausted => {
331 write!(f, "Thread pool is exhausted and cannot spawn more observers")
332 }
333 PropertyError::InvalidConfiguration { reason } => {
334 write!(f, "Invalid configuration: {}", reason)
335 }
336 }
337 }
338}
339
340impl std::error::Error for PropertyError {}
341
342/// Function type for observers that get called when property values change
343pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
344
345/// Unique identifier for registered observers
346pub type ObserverId = usize;
347
348/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
349///
350/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
351/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
352/// `Drop` implementation automatically removes the associated observer from the property.
353///
354/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
355/// leaks in scenarios where observers might otherwise be forgotten.
356///
357/// # Type Requirements
358///
359/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
360/// - `Clone`: Required for observer notifications
361/// - `Send`: Required for transferring between threads
362/// - `Sync`: Required for concurrent access from multiple threads
363/// - `'static`: Required for observer callbacks that may outlive the original scope
364///
365/// # Examples
366///
367/// ## Basic RAII Subscription
368///
369/// ```rust
370/// use observable_property::ObservableProperty;
371/// use std::sync::Arc;
372///
373/// # fn main() -> Result<(), observable_property::PropertyError> {
374/// let property = ObservableProperty::new(0);
375///
376/// {
377/// // Create subscription - observer is automatically registered
378/// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
379/// println!("Value changed: {} -> {}", old, new);
380/// }))?;
381///
382/// property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
383///
384/// // When _subscription goes out of scope here, observer is automatically removed
385/// }
386///
387/// property.set(100)?; // No observer output - subscription was automatically cleaned up
388/// # Ok(())
389/// # }
390/// ```
391///
392/// ## Cross-Thread Subscription Management
393///
394/// ```rust
395/// use observable_property::ObservableProperty;
396/// use std::sync::Arc;
397/// use std::thread;
398///
399/// # fn main() -> Result<(), observable_property::PropertyError> {
400/// let property = Arc::new(ObservableProperty::new(0));
401/// let property_clone = property.clone();
402///
403/// // Create subscription in main thread
404/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
405/// println!("Observed: {} -> {}", old, new);
406/// }))?;
407///
408/// // Move subscription to another thread for cleanup
409/// let handle = thread::spawn(move || {
410/// // Subscription is still active here
411/// let _ = property_clone.set(42); // Will trigger observer
412///
413/// // When subscription is dropped here (end of thread), observer is cleaned up
414/// drop(subscription);
415/// });
416///
417/// handle.join().unwrap();
418///
419/// // Observer is no longer active
420/// property.set(100)?; // No output
421/// # Ok(())
422/// # }
423/// ```
424///
425/// ## Conditional Scoped Subscriptions
426///
427/// ```rust
428/// use observable_property::ObservableProperty;
429/// use std::sync::Arc;
430///
431/// # fn main() -> Result<(), observable_property::PropertyError> {
432/// let counter = ObservableProperty::new(0);
433/// let debug_mode = true;
434///
435/// if debug_mode {
436/// let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
437/// println!("Debug: counter {} -> {}", old, new);
438/// }))?;
439///
440/// counter.set(1)?; // Prints debug info
441/// counter.set(2)?; // Prints debug info
442///
443/// // Debug subscription automatically cleaned up when exiting if block
444/// }
445///
446/// counter.set(3)?; // No debug output (subscription was cleaned up)
447/// # Ok(())
448/// # }
449/// ```
450///
451/// # Thread Safety
452///
453/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
454/// sent between threads and the automatic cleanup will work correctly even if the
455/// subscription is dropped from a different thread than where it was created.
456pub struct Subscription<T: Clone + Send + Sync + 'static> {
457 inner: Arc<RwLock<InnerProperty<T>>>,
458 id: ObserverId,
459}
460
461impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
462 /// Debug implementation that shows the subscription ID without exposing internals
463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464 f.debug_struct("Subscription")
465 .field("id", &self.id)
466 .field("inner", &"[ObservableProperty]")
467 .finish()
468 }
469}
470
471impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
472 /// Automatically removes the associated observer when the subscription is dropped
473 ///
474 /// This implementation provides automatic cleanup by removing the observer
475 /// from the property's observer list when the `Subscription` goes out of scope.
476 ///
477 /// # Error Handling
478 ///
479 /// If the property's lock is poisoned or inaccessible during cleanup, the error
480 /// is silently ignored using the `let _ = ...` pattern. This is intentional
481 /// because:
482 /// 1. Drop implementations should not panic
483 /// 2. If the property is poisoned, it's likely unusable anyway
484 /// 3. There's no meaningful way to handle cleanup errors in a destructor
485 ///
486 /// # Thread Safety
487 ///
488 /// This method is safe to call from any thread, even if the subscription
489 /// was created on a different thread.
490 fn drop(&mut self) {
491 let _ = self.inner.write().map(|mut prop| {
492 prop.observers.remove(&self.id);
493 });
494 }
495}
496
497/// A thread-safe observable property that notifies observers when its value changes
498///
499/// This type wraps a value of type `T` and allows multiple observers to be notified
500/// whenever the value is modified. All operations are thread-safe and can be called
501/// from multiple threads concurrently.
502///
503/// # Type Requirements
504///
505/// The generic type `T` must implement:
506/// - `Clone`: Required for returning values and passing them to observers
507/// - `Send`: Required for transferring between threads
508/// - `Sync`: Required for concurrent access from multiple threads
509/// - `'static`: Required for observer callbacks that may outlive the original scope
510///
511/// # Examples
512///
513/// ```rust
514/// use observable_property::ObservableProperty;
515/// use std::sync::Arc;
516///
517/// let property = ObservableProperty::new("initial".to_string());
518///
519/// let observer_id = property.subscribe(Arc::new(|old, new| {
520/// println!("Changed from '{}' to '{}'", old, new);
521/// })).map_err(|e| {
522/// eprintln!("Failed to subscribe: {}", e);
523/// e
524/// })?;
525///
526/// property.set("updated".to_string()).map_err(|e| {
527/// eprintln!("Failed to set value: {}", e);
528/// e
529/// })?; // Prints: Changed from 'initial' to 'updated'
530///
531/// property.unsubscribe(observer_id).map_err(|e| {
532/// eprintln!("Failed to unsubscribe: {}", e);
533/// e
534/// })?;
535/// # Ok::<(), observable_property::PropertyError>(())
536/// ```
537pub struct ObservableProperty<T> {
538 inner: Arc<RwLock<InnerProperty<T>>>,
539 max_threads: usize,
540}
541
542struct InnerProperty<T> {
543 value: T,
544 observers: HashMap<ObserverId, Observer<T>>,
545 next_id: ObserverId,
546}
547
548impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
549 /// Creates a new observable property with the given initial value
550 ///
551 /// # Arguments
552 ///
553 /// * `initial_value` - The starting value for this property
554 ///
555 /// # Examples
556 ///
557 /// ```rust
558 /// use observable_property::ObservableProperty;
559 ///
560 /// let property = ObservableProperty::new(42);
561 /// match property.get() {
562 /// Ok(value) => assert_eq!(value, 42),
563 /// Err(e) => eprintln!("Failed to get property value: {}", e),
564 /// }
565 /// ```
566 pub fn new(initial_value: T) -> Self {
567 Self {
568 inner: Arc::new(RwLock::new(InnerProperty {
569 value: initial_value,
570 observers: HashMap::new(),
571 next_id: 0,
572 })),
573 max_threads: MAX_THREADS,
574 }
575 }
576
577 /// Creates a new observable property with a custom maximum thread count for async notifications
578 ///
579 /// This constructor allows you to customize the maximum number of threads used for
580 /// asynchronous observer notifications via `set_async()`. This is useful for tuning
581 /// performance based on your specific use case and system constraints.
582 ///
583 /// # Arguments
584 ///
585 /// * `initial_value` - The starting value for this property
586 /// * `max_threads` - Maximum number of threads to use for async notifications.
587 /// If 0 is provided, defaults to 4.
588 ///
589 /// # Thread Pool Behavior
590 ///
591 /// When `set_async()` is called, observers are divided into batches and each batch
592 /// runs in its own thread, up to the specified maximum. For example:
593 /// - With 100 observers and `max_threads = 4`: 4 threads with ~25 observers each
594 /// - With 10 observers and `max_threads = 8`: 10 threads with 1 observer each
595 /// - With 2 observers and `max_threads = 4`: 2 threads with 1 observer each
596 ///
597 /// # Use Cases
598 ///
599 /// ## High-Throughput Systems
600 /// ```rust
601 /// use observable_property::ObservableProperty;
602 ///
603 /// // For systems with many CPU cores and CPU-bound observers
604 /// let property = ObservableProperty::with_max_threads(0, 8);
605 /// ```
606 ///
607 /// ## Resource-Constrained Systems
608 /// ```rust
609 /// use observable_property::ObservableProperty;
610 ///
611 /// // For embedded systems or memory-constrained environments
612 /// let property = ObservableProperty::with_max_threads(42, 1);
613 /// ```
614 ///
615 /// ## I/O-Heavy Observers
616 /// ```rust
617 /// use observable_property::ObservableProperty;
618 ///
619 /// // For observers that do network/database operations
620 /// let property = ObservableProperty::with_max_threads("data".to_string(), 16);
621 /// ```
622 ///
623 /// # Performance Considerations
624 ///
625 /// - **Higher values**: Better parallelism but more thread overhead and memory usage
626 /// - **Lower values**: Less overhead but potentially slower async notifications
627 /// - **Optimal range**: Typically between 1 and 2x the number of CPU cores
628 /// - **Zero value**: Automatically uses the default value (4)
629 ///
630 /// # Thread Safety
631 ///
632 /// This setting only affects async notifications (`set_async()`). Synchronous
633 /// operations (`set()`) always execute observers sequentially regardless of this setting.
634 ///
635 /// # Examples
636 ///
637 /// ```rust
638 /// use observable_property::ObservableProperty;
639 /// use std::sync::Arc;
640 ///
641 /// // Create property with custom thread pool size
642 /// let property = ObservableProperty::with_max_threads(42, 2);
643 ///
644 /// // Subscribe observers as usual
645 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
646 /// println!("Value changed: {} -> {}", old, new);
647 /// })).expect("Failed to create subscription");
648 ///
649 /// // Async notifications will use at most 2 threads
650 /// property.set_async(100).expect("Failed to set value asynchronously");
651 /// ```
652 pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
653 let max_threads = if max_threads == 0 {
654 MAX_THREADS
655 } else {
656 max_threads
657 };
658 Self {
659 inner: Arc::new(RwLock::new(InnerProperty {
660 value: initial_value,
661 observers: HashMap::new(),
662 next_id: 0,
663 })),
664 max_threads,
665 }
666 }
667
668 /// Gets the current value of the property
669 ///
670 /// This method acquires a read lock, which allows multiple concurrent readers
671 /// but will block if a writer currently holds the lock.
672 ///
673 /// # Returns
674 ///
675 /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
676 /// if the lock is poisoned.
677 ///
678 /// # Examples
679 ///
680 /// ```rust
681 /// use observable_property::ObservableProperty;
682 ///
683 /// let property = ObservableProperty::new("hello".to_string());
684 /// match property.get() {
685 /// Ok(value) => assert_eq!(value, "hello"),
686 /// Err(e) => eprintln!("Failed to get property value: {}", e),
687 /// }
688 /// ```
689 pub fn get(&self) -> Result<T, PropertyError> {
690 self.inner
691 .read()
692 .map(|prop| prop.value.clone())
693 .map_err(|_| PropertyError::PoisonedLock)
694 }
695
696 /// Sets the property to a new value and notifies all observers
697 ///
698 /// This method will:
699 /// 1. Acquire a write lock (blocking other readers/writers)
700 /// 2. Update the value and capture a snapshot of observers
701 /// 3. Release the lock
702 /// 4. Notify all observers sequentially with the old and new values
703 ///
704 /// Observer notifications are wrapped in panic recovery to prevent one
705 /// misbehaving observer from affecting others.
706 ///
707 /// # Arguments
708 ///
709 /// * `new_value` - The new value to set
710 ///
711 /// # Returns
712 ///
713 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
714 ///
715 /// # Examples
716 ///
717 /// ```rust
718 /// use observable_property::ObservableProperty;
719 /// use std::sync::Arc;
720 ///
721 /// let property = ObservableProperty::new(10);
722 ///
723 /// property.subscribe(Arc::new(|old, new| {
724 /// println!("Value changed from {} to {}", old, new);
725 /// })).map_err(|e| {
726 /// eprintln!("Failed to subscribe: {}", e);
727 /// e
728 /// })?;
729 ///
730 /// property.set(20).map_err(|e| {
731 /// eprintln!("Failed to set property value: {}", e);
732 /// e
733 /// })?; // Triggers observer notification
734 /// # Ok::<(), observable_property::PropertyError>(())
735 /// ```
736 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
737 let (old_value, observers_snapshot) = {
738 let mut prop = self
739 .inner
740 .write()
741 .map_err(|_| PropertyError::WriteLockError {
742 context: "setting property value".to_string(),
743 })?;
744
745 let old_value = prop.value.clone();
746 prop.value = new_value.clone();
747 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
748 (old_value, observers_snapshot)
749 };
750
751 for observer in observers_snapshot {
752 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
753 observer(&old_value, &new_value);
754 })) {
755 eprintln!("Observer panic: {:?}", e);
756 }
757 }
758
759 Ok(())
760 }
761
762 /// Sets the property to a new value and notifies observers asynchronously
763 ///
764 /// This method is similar to `set()` but spawns observers in background threads
765 /// for non-blocking operation. This is useful when observers might perform
766 /// time-consuming operations.
767 ///
768 /// Observers are batched into groups and each batch runs in its own thread
769 /// to limit resource usage while still providing parallelism.
770 ///
771 /// # Arguments
772 ///
773 /// * `new_value` - The new value to set
774 ///
775 /// # Returns
776 ///
777 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
778 /// Note that this only indicates the property was updated successfully;
779 /// observer execution happens asynchronously.
780 ///
781 /// # Examples
782 ///
783 /// ```rust
784 /// use observable_property::ObservableProperty;
785 /// use std::sync::Arc;
786 /// use std::time::Duration;
787 ///
788 /// let property = ObservableProperty::new(0);
789 ///
790 /// property.subscribe(Arc::new(|old, new| {
791 /// // This observer does slow work but won't block the caller
792 /// std::thread::sleep(Duration::from_millis(100));
793 /// println!("Slow observer: {} -> {}", old, new);
794 /// })).map_err(|e| {
795 /// eprintln!("Failed to subscribe: {}", e);
796 /// e
797 /// })?;
798 ///
799 /// // This returns immediately even though observer is slow
800 /// property.set_async(42).map_err(|e| {
801 /// eprintln!("Failed to set value asynchronously: {}", e);
802 /// e
803 /// })?;
804 /// # Ok::<(), observable_property::PropertyError>(())
805 /// ```
806 pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
807 let (old_value, observers_snapshot) = {
808 let mut prop = self
809 .inner
810 .write()
811 .map_err(|_| PropertyError::WriteLockError {
812 context: "setting property value".to_string(),
813 })?;
814
815 let old_value = prop.value.clone();
816 prop.value = new_value.clone();
817 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
818 (old_value, observers_snapshot)
819 };
820
821 if observers_snapshot.is_empty() {
822 return Ok(());
823 }
824
825 let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
826
827 for batch in observers_snapshot.chunks(observers_per_thread) {
828 let batch_observers = batch.to_vec();
829 let old_val = old_value.clone();
830 let new_val = new_value.clone();
831
832 thread::spawn(move || {
833 for observer in batch_observers {
834 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
835 observer(&old_val, &new_val);
836 })) {
837 eprintln!("Observer panic in batch thread: {:?}", e);
838 }
839 }
840 });
841 }
842
843 Ok(())
844 }
845
846 /// Subscribes an observer function to be called when the property changes
847 ///
848 /// The observer function will be called with the old and new values whenever
849 /// the property is modified via `set()` or `set_async()`.
850 ///
851 /// # Arguments
852 ///
853 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
854 ///
855 /// # Returns
856 ///
857 /// `Ok(ObserverId)` containing a unique identifier for this observer,
858 /// or `Err(PropertyError)` if the lock is poisoned.
859 ///
860 /// # Examples
861 ///
862 /// ```rust
863 /// use observable_property::ObservableProperty;
864 /// use std::sync::Arc;
865 ///
866 /// let property = ObservableProperty::new(0);
867 ///
868 /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
869 /// println!("Property changed from {} to {}", old_value, new_value);
870 /// })).map_err(|e| {
871 /// eprintln!("Failed to subscribe observer: {}", e);
872 /// e
873 /// })?;
874 ///
875 /// // Later, unsubscribe using the returned ID
876 /// property.unsubscribe(observer_id).map_err(|e| {
877 /// eprintln!("Failed to unsubscribe observer: {}", e);
878 /// e
879 /// })?;
880 /// # Ok::<(), observable_property::PropertyError>(())
881 /// ```
882 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
883 let mut prop = self
884 .inner
885 .write()
886 .map_err(|_| PropertyError::WriteLockError {
887 context: "subscribing observer".to_string(),
888 })?;
889
890 let id = prop.next_id;
891 prop.next_id += 1;
892 prop.observers.insert(id, observer);
893 Ok(id)
894 }
895
896 /// Removes an observer identified by its ID
897 ///
898 /// # Arguments
899 ///
900 /// * `id` - The observer ID returned by `subscribe()`
901 ///
902 /// # Returns
903 ///
904 /// `Ok(bool)` where `true` means the observer was found and removed,
905 /// `false` means no observer with that ID existed.
906 /// Returns `Err(PropertyError)` if the lock is poisoned.
907 ///
908 /// # Examples
909 ///
910 /// ```rust
911 /// use observable_property::ObservableProperty;
912 /// use std::sync::Arc;
913 ///
914 /// let property = ObservableProperty::new(0);
915 /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
916 /// eprintln!("Failed to subscribe: {}", e);
917 /// e
918 /// })?;
919 ///
920 /// let was_removed = property.unsubscribe(id).map_err(|e| {
921 /// eprintln!("Failed to unsubscribe: {}", e);
922 /// e
923 /// })?;
924 /// assert!(was_removed); // Observer existed and was removed
925 ///
926 /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
927 /// eprintln!("Failed to unsubscribe again: {}", e);
928 /// e
929 /// })?;
930 /// assert!(!was_removed_again); // Observer no longer exists
931 /// # Ok::<(), observable_property::PropertyError>(())
932 /// ```
933 pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
934 let mut prop = self
935 .inner
936 .write()
937 .map_err(|_| PropertyError::WriteLockError {
938 context: "unsubscribing observer".to_string(),
939 })?;
940
941 let was_present = prop.observers.remove(&id).is_some();
942 Ok(was_present)
943 }
944
945 /// Subscribes an observer that only gets called when a filter condition is met
946 ///
947 /// This is useful for observing only specific types of changes, such as
948 /// when a value increases or crosses a threshold.
949 ///
950 /// # Arguments
951 ///
952 /// * `observer` - The observer function to call when the filter passes
953 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
954 ///
955 /// # Returns
956 ///
957 /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
958 ///
959 /// # Examples
960 ///
961 /// ```rust
962 /// use observable_property::ObservableProperty;
963 /// use std::sync::Arc;
964 ///
965 /// let property = ObservableProperty::new(0);
966 ///
967 /// // Only notify when value increases
968 /// let id = property.subscribe_filtered(
969 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
970 /// |old, new| new > old
971 /// ).map_err(|e| {
972 /// eprintln!("Failed to subscribe filtered observer: {}", e);
973 /// e
974 /// })?;
975 ///
976 /// property.set(10).map_err(|e| {
977 /// eprintln!("Failed to set value: {}", e);
978 /// e
979 /// })?; // Triggers observer (0 -> 10)
980 /// property.set(5).map_err(|e| {
981 /// eprintln!("Failed to set value: {}", e);
982 /// e
983 /// })?; // Does NOT trigger observer (10 -> 5)
984 /// property.set(15).map_err(|e| {
985 /// eprintln!("Failed to set value: {}", e);
986 /// e
987 /// })?; // Triggers observer (5 -> 15)
988 /// # Ok::<(), observable_property::PropertyError>(())
989 /// ```
990 pub fn subscribe_filtered<F>(
991 &self,
992 observer: Observer<T>,
993 filter: F,
994 ) -> Result<ObserverId, PropertyError>
995 where
996 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
997 {
998 let filter = Arc::new(filter);
999 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1000 if filter(old_val, new_val) {
1001 observer(old_val, new_val);
1002 }
1003 });
1004
1005 self.subscribe(filtered_observer)
1006 }
1007
1008 pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
1009 let prop = self
1010 .inner
1011 .read()
1012 .map_err(|_| PropertyError::ReadLockError {
1013 context: "notifying observers".to_string(),
1014 })?;
1015
1016 for (old_val, new_val) in changes {
1017 for observer in prop.observers.values() {
1018 observer(&old_val, &new_val);
1019 }
1020 }
1021 Ok(())
1022 }
1023
1024 /// Subscribes an observer and returns a RAII guard for automatic cleanup
1025 ///
1026 /// This method is similar to `subscribe()` but returns a `Subscription` object
1027 /// that automatically removes the observer when it goes out of scope. This
1028 /// provides a more convenient and safer alternative to manual subscription
1029 /// management.
1030 ///
1031 /// # Arguments
1032 ///
1033 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1034 ///
1035 /// # Returns
1036 ///
1037 /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
1038 /// or `Err(PropertyError)` if the lock is poisoned.
1039 ///
1040 /// # Examples
1041 ///
1042 /// ## Basic RAII Subscription
1043 ///
1044 /// ```rust
1045 /// use observable_property::ObservableProperty;
1046 /// use std::sync::Arc;
1047 ///
1048 /// # fn main() -> Result<(), observable_property::PropertyError> {
1049 /// let property = ObservableProperty::new(0);
1050 ///
1051 /// {
1052 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1053 /// println!("Value: {} -> {}", old, new);
1054 /// }))?;
1055 ///
1056 /// property.set(42)?; // Prints: "Value: 0 -> 42"
1057 /// property.set(100)?; // Prints: "Value: 42 -> 100"
1058 ///
1059 /// // Automatic cleanup when _subscription goes out of scope
1060 /// }
1061 ///
1062 /// property.set(200)?; // No output - subscription was cleaned up
1063 /// # Ok(())
1064 /// # }
1065 /// ```
1066 ///
1067 /// ## Comparison with Manual Management
1068 ///
1069 /// ```rust
1070 /// use observable_property::ObservableProperty;
1071 /// use std::sync::Arc;
1072 ///
1073 /// # fn main() -> Result<(), observable_property::PropertyError> {
1074 /// let property = ObservableProperty::new("initial".to_string());
1075 ///
1076 /// // Method 1: Manual subscription management (traditional approach)
1077 /// let observer_id = property.subscribe(Arc::new(|old, new| {
1078 /// println!("Manual: {} -> {}", old, new);
1079 /// }))?;
1080 ///
1081 /// // Method 2: RAII subscription management (recommended)
1082 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1083 /// println!("RAII: {} -> {}", old, new);
1084 /// }))?;
1085 ///
1086 /// // Both observers will be called
1087 /// property.set("changed".to_string())?;
1088 /// // Prints:
1089 /// // "Manual: initial -> changed"
1090 /// // "RAII: initial -> changed"
1091 ///
1092 /// // Manual cleanup required for first observer
1093 /// property.unsubscribe(observer_id)?;
1094 ///
1095 /// // Second observer (_subscription) is automatically cleaned up when
1096 /// // the variable goes out of scope - no manual intervention needed
1097 /// # Ok(())
1098 /// # }
1099 /// ```
1100 ///
1101 /// ## Error Handling with Early Returns
1102 ///
1103 /// ```rust
1104 /// use observable_property::ObservableProperty;
1105 /// use std::sync::Arc;
1106 ///
1107 /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1108 /// let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1109 /// println!("Processing: {} -> {}", old, new);
1110 /// }))?;
1111 ///
1112 /// property.set(1)?;
1113 ///
1114 /// if property.get()? > 0 {
1115 /// return Ok(()); // Subscription automatically cleaned up on early return
1116 /// }
1117 ///
1118 /// property.set(2)?;
1119 /// Ok(()) // Subscription automatically cleaned up on normal return
1120 /// }
1121 ///
1122 /// # fn main() -> Result<(), observable_property::PropertyError> {
1123 /// let property = ObservableProperty::new(0);
1124 /// process_with_monitoring(&property)?; // Monitoring active only during function call
1125 /// property.set(99)?; // No monitoring output - subscription was cleaned up
1126 /// # Ok(())
1127 /// # }
1128 /// ```
1129 ///
1130 /// ## Multi-threaded Subscription Management
1131 ///
1132 /// ```rust
1133 /// use observable_property::ObservableProperty;
1134 /// use std::sync::Arc;
1135 /// use std::thread;
1136 ///
1137 /// # fn main() -> Result<(), observable_property::PropertyError> {
1138 /// let property = Arc::new(ObservableProperty::new(0));
1139 /// let property_clone = property.clone();
1140 ///
1141 /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1142 /// let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1143 /// println!("Thread observer: {} -> {}", old, new);
1144 /// }))?;
1145 ///
1146 /// property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1147 ///
1148 /// // Subscription automatically cleaned up when thread ends
1149 /// Ok(())
1150 /// });
1151 ///
1152 /// handle.join().unwrap()?;
1153 /// property.set(100)?; // No output - thread subscription was cleaned up
1154 /// # Ok(())
1155 /// # }
1156 /// ```
1157 ///
1158 /// # Use Cases
1159 ///
1160 /// This method is particularly useful in scenarios such as:
1161 /// - Temporary observers that should be active only during a specific scope
1162 /// - Error-prone code where manual cleanup might be forgotten
1163 /// - Complex control flow where multiple exit points make manual cleanup difficult
1164 /// - Resource-constrained environments where observer leaks are problematic
1165 pub fn subscribe_with_subscription(
1166 &self,
1167 observer: Observer<T>,
1168 ) -> Result<Subscription<T>, PropertyError> {
1169 let id = self.subscribe(observer)?;
1170 Ok(Subscription {
1171 inner: Arc::clone(&self.inner),
1172 id,
1173 })
1174 }
1175
1176 /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1177 ///
1178 /// This method combines the functionality of `subscribe_filtered()` with the automatic
1179 /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1180 /// called when the filter condition is satisfied, and it will be automatically
1181 /// unsubscribed when the returned `Subscription` goes out of scope.
1182 ///
1183 /// # Arguments
1184 ///
1185 /// * `observer` - The observer function to call when the filter passes
1186 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1187 ///
1188 /// # Returns
1189 ///
1190 /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1191 /// or `Err(PropertyError)` if the lock is poisoned.
1192 ///
1193 /// # Examples
1194 ///
1195 /// ## Basic Filtered RAII Subscription
1196 ///
1197 /// ```rust
1198 /// use observable_property::ObservableProperty;
1199 /// use std::sync::Arc;
1200 ///
1201 /// # fn main() -> Result<(), observable_property::PropertyError> {
1202 /// let counter = ObservableProperty::new(0);
1203 ///
1204 /// {
1205 /// // Monitor only increases with automatic cleanup
1206 /// let _increase_monitor = counter.subscribe_filtered_with_subscription(
1207 /// Arc::new(|old, new| {
1208 /// println!("Counter increased: {} -> {}", old, new);
1209 /// }),
1210 /// |old, new| new > old
1211 /// )?;
1212 ///
1213 /// counter.set(5)?; // Prints: "Counter increased: 0 -> 5"
1214 /// counter.set(3)?; // No output (decrease)
1215 /// counter.set(7)?; // Prints: "Counter increased: 3 -> 7"
1216 ///
1217 /// // Subscription automatically cleaned up when leaving scope
1218 /// }
1219 ///
1220 /// counter.set(10)?; // No output - subscription was cleaned up
1221 /// # Ok(())
1222 /// # }
1223 /// ```
1224 ///
1225 /// ## Multi-Condition Temperature Monitoring
1226 ///
1227 /// ```rust
1228 /// use observable_property::ObservableProperty;
1229 /// use std::sync::Arc;
1230 ///
1231 /// # fn main() -> Result<(), observable_property::PropertyError> {
1232 /// let temperature = ObservableProperty::new(20.0_f64);
1233 ///
1234 /// {
1235 /// // Create filtered subscription that only triggers for significant temperature increases
1236 /// let _heat_warning = temperature.subscribe_filtered_with_subscription(
1237 /// Arc::new(|old_temp, new_temp| {
1238 /// println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1239 /// old_temp, new_temp);
1240 /// }),
1241 /// |old, new| new > old && (new - old) > 5.0 // Only trigger for increases > 5°C
1242 /// )?;
1243 ///
1244 /// // Create another filtered subscription for cooling alerts
1245 /// let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1246 /// Arc::new(|old_temp, new_temp| {
1247 /// println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1248 /// old_temp, new_temp);
1249 /// }),
1250 /// |old, new| new < old && (old - new) > 3.0 // Only trigger for decreases > 3°C
1251 /// )?;
1252 ///
1253 /// // Test the filters
1254 /// temperature.set(22.0)?; // No alerts (increase of only 2°C)
1255 /// temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1256 /// temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1257 ///
1258 /// // Both subscriptions are automatically cleaned up when they go out of scope
1259 /// }
1260 ///
1261 /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1262 /// # Ok(())
1263 /// # }
1264 /// ```
1265 ///
1266 /// ## Conditional Monitoring with Complex Filters
1267 ///
1268 /// ```rust
1269 /// use observable_property::ObservableProperty;
1270 /// use std::sync::Arc;
1271 ///
1272 /// # fn main() -> Result<(), observable_property::PropertyError> {
1273 /// let stock_price = ObservableProperty::new(100.0_f64);
1274 ///
1275 /// {
1276 /// // Monitor significant price movements (> 5% change)
1277 /// let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1278 /// Arc::new(|old_price, new_price| {
1279 /// let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1280 /// println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1281 /// old_price, new_price, change_percent);
1282 /// }),
1283 /// |old, new| {
1284 /// let change_percent = ((new - old) / old * 100.0).abs();
1285 /// change_percent > 5.0 // Trigger on > 5% change
1286 /// }
1287 /// )?;
1288 ///
1289 /// stock_price.set(103.0)?; // No alert (3% change)
1290 /// stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1291 /// stock_price.set(95.0)?; // Alert triggered (12% decrease)
1292 ///
1293 /// // Subscription automatically cleaned up when leaving scope
1294 /// }
1295 ///
1296 /// stock_price.set(200.0)?; // No alert - monitoring ended
1297 /// # Ok(())
1298 /// # }
1299 /// ```
1300 ///
1301 /// ## Cross-Thread Filtered Monitoring
1302 ///
1303 /// ```rust
1304 /// use observable_property::ObservableProperty;
1305 /// use std::sync::Arc;
1306 /// use std::thread;
1307 /// use std::time::Duration;
1308 ///
1309 /// # fn main() -> Result<(), observable_property::PropertyError> {
1310 /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1311 /// let latency_clone = network_latency.clone();
1312 ///
1313 /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1314 /// // Monitor high latency in background thread with automatic cleanup
1315 /// let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1316 /// Arc::new(|old_ms, new_ms| {
1317 /// println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1318 /// }),
1319 /// |_, new| *new > 100 // Alert when latency exceeds 100ms
1320 /// )?;
1321 ///
1322 /// // Simulate monitoring for a short time
1323 /// thread::sleep(Duration::from_millis(10));
1324 ///
1325 /// // Subscription automatically cleaned up when thread ends
1326 /// Ok(())
1327 /// });
1328 ///
1329 /// // Simulate network conditions
1330 /// network_latency.set(80)?; // No alert (under threshold)
1331 /// network_latency.set(150)?; // Alert triggered in background thread
1332 ///
1333 /// monitor_handle.join().unwrap()?;
1334 /// network_latency.set(200)?; // No alert - background monitoring ended
1335 /// # Ok(())
1336 /// # }
1337 /// ```
1338 ///
1339 /// # Use Cases
1340 ///
1341 /// This method is ideal for:
1342 /// - Threshold-based monitoring with automatic cleanup
1343 /// - Temporary conditional observers in specific code blocks
1344 /// - Event-driven systems where observers should be active only during certain phases
1345 /// - Resource management scenarios where filtered observers have limited lifetimes
1346 ///
1347 /// # Performance Notes
1348 ///
1349 /// The filter function is evaluated for every property change, so it should be
1350 /// lightweight. Complex filtering logic should be optimized to avoid performance
1351 /// bottlenecks, especially in high-frequency update scenarios.
1352 pub fn subscribe_filtered_with_subscription<F>(
1353 &self,
1354 observer: Observer<T>,
1355 filter: F,
1356 ) -> Result<Subscription<T>, PropertyError>
1357 where
1358 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1359 {
1360 let id = self.subscribe_filtered(observer, filter)?;
1361 Ok(Subscription {
1362 inner: Arc::clone(&self.inner),
1363 id,
1364 })
1365 }
1366}
1367
1368impl<T: Clone> Clone for ObservableProperty<T> {
1369 /// Creates a new reference to the same observable property
1370 ///
1371 /// This creates a new `ObservableProperty` instance that shares the same
1372 /// underlying data with the original. Changes made through either instance
1373 /// will be visible to observers subscribed through both instances.
1374 ///
1375 /// # Examples
1376 ///
1377 /// ```rust
1378 /// use observable_property::ObservableProperty;
1379 /// use std::sync::Arc;
1380 ///
1381 /// let property1 = ObservableProperty::new(42);
1382 /// let property2 = property1.clone();
1383 ///
1384 /// property2.subscribe(Arc::new(|old, new| {
1385 /// println!("Observer on property2 saw change: {} -> {}", old, new);
1386 /// })).map_err(|e| {
1387 /// eprintln!("Failed to subscribe: {}", e);
1388 /// e
1389 /// })?;
1390 ///
1391 /// // This change through property1 will trigger the observer on property2
1392 /// property1.set(100).map_err(|e| {
1393 /// eprintln!("Failed to set value: {}", e);
1394 /// e
1395 /// })?;
1396 /// # Ok::<(), observable_property::PropertyError>(())
1397 /// ```
1398 fn clone(&self) -> Self {
1399 Self {
1400 inner: Arc::clone(&self.inner),
1401 max_threads: self.max_threads,
1402 }
1403 }
1404}
1405
1406impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1407 /// Debug implementation that shows the current value if accessible
1408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1409 match self.get() {
1410 Ok(value) => f
1411 .debug_struct("ObservableProperty")
1412 .field("value", &value)
1413 .field("observers_count", &"[hidden]")
1414 .field("max_threads", &self.max_threads)
1415 .finish(),
1416 Err(_) => f
1417 .debug_struct("ObservableProperty")
1418 .field("value", &"[inaccessible]")
1419 .field("observers_count", &"[hidden]")
1420 .finish(),
1421 }
1422 }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427 use super::*;
1428 use std::sync::atomic::{AtomicUsize, Ordering};
1429 use std::time::Duration;
1430
1431 #[test]
1432 fn test_property_creation_and_basic_operations() {
1433 let prop = ObservableProperty::new(42);
1434
1435 // Test initial value
1436 match prop.get() {
1437 Ok(value) => assert_eq!(value, 42),
1438 Err(e) => panic!("Failed to get initial value: {}", e),
1439 }
1440
1441 // Test setting value
1442 if let Err(e) = prop.set(100) {
1443 panic!("Failed to set value: {}", e);
1444 }
1445
1446 match prop.get() {
1447 Ok(value) => assert_eq!(value, 100),
1448 Err(e) => panic!("Failed to get updated value: {}", e),
1449 }
1450 }
1451
1452 #[test]
1453 fn test_observer_subscription_and_notification() {
1454 let prop = ObservableProperty::new("initial".to_string());
1455 let notification_count = Arc::new(AtomicUsize::new(0));
1456 let last_old_value = Arc::new(RwLock::new(String::new()));
1457 let last_new_value = Arc::new(RwLock::new(String::new()));
1458
1459 let count_clone = notification_count.clone();
1460 let old_clone = last_old_value.clone();
1461 let new_clone = last_new_value.clone();
1462
1463 let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1464 count_clone.fetch_add(1, Ordering::SeqCst);
1465 if let Ok(mut old_val) = old_clone.write() {
1466 *old_val = old.clone();
1467 }
1468 if let Ok(mut new_val) = new_clone.write() {
1469 *new_val = new.clone();
1470 }
1471 })) {
1472 Ok(id) => id,
1473 Err(e) => panic!("Failed to subscribe observer: {}", e),
1474 };
1475
1476 // Change value and verify notification
1477 if let Err(e) = prop.set("changed".to_string()) {
1478 panic!("Failed to set property value: {}", e);
1479 }
1480
1481 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1482
1483 match last_old_value.read() {
1484 Ok(old_val) => assert_eq!(*old_val, "initial"),
1485 Err(e) => panic!("Failed to read old value: {:?}", e),
1486 }
1487
1488 match last_new_value.read() {
1489 Ok(new_val) => assert_eq!(*new_val, "changed"),
1490 Err(e) => panic!("Failed to read new value: {:?}", e),
1491 }
1492
1493 // Test unsubscription
1494 match prop.unsubscribe(observer_id) {
1495 Ok(was_present) => assert!(was_present),
1496 Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1497 }
1498
1499 // Change value again - should not notify
1500 if let Err(e) = prop.set("not_notified".to_string()) {
1501 panic!("Failed to set property value after unsubscribe: {}", e);
1502 }
1503 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1504 }
1505
1506 #[test]
1507 fn test_filtered_observer() {
1508 let prop = ObservableProperty::new(0i32);
1509 let notification_count = Arc::new(AtomicUsize::new(0));
1510 let count_clone = notification_count.clone();
1511
1512 // Observer only triggered when value increases
1513 let observer_id = match prop.subscribe_filtered(
1514 Arc::new(move |_, _| {
1515 count_clone.fetch_add(1, Ordering::SeqCst);
1516 }),
1517 |old, new| new > old,
1518 ) {
1519 Ok(id) => id,
1520 Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1521 };
1522
1523 // Should trigger (0 -> 5)
1524 if let Err(e) = prop.set(5) {
1525 panic!("Failed to set property value to 5: {}", e);
1526 }
1527 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1528
1529 // Should NOT trigger (5 -> 3)
1530 if let Err(e) = prop.set(3) {
1531 panic!("Failed to set property value to 3: {}", e);
1532 }
1533 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1534
1535 // Should trigger (3 -> 10)
1536 if let Err(e) = prop.set(10) {
1537 panic!("Failed to set property value to 10: {}", e);
1538 }
1539 assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1540
1541 match prop.unsubscribe(observer_id) {
1542 Ok(_) => {}
1543 Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
1544 }
1545 }
1546
1547 #[test]
1548 fn test_thread_safety_concurrent_reads() {
1549 let prop = Arc::new(ObservableProperty::new(42i32));
1550 let num_threads = 10;
1551 let reads_per_thread = 100;
1552
1553 let handles: Vec<_> = (0..num_threads)
1554 .map(|_| {
1555 let prop_clone = prop.clone();
1556 thread::spawn(move || {
1557 for _ in 0..reads_per_thread {
1558 match prop_clone.get() {
1559 Ok(value) => assert_eq!(value, 42),
1560 Err(e) => panic!("Failed to read property value: {}", e),
1561 }
1562 thread::sleep(Duration::from_millis(1));
1563 }
1564 })
1565 })
1566 .collect();
1567
1568 for handle in handles {
1569 if let Err(e) = handle.join() {
1570 panic!("Thread failed to complete: {:?}", e);
1571 }
1572 }
1573 }
1574
1575 #[test]
1576 fn test_async_set_performance() {
1577 let prop = ObservableProperty::new(0i32);
1578 let slow_observer_count = Arc::new(AtomicUsize::new(0));
1579 let count_clone = slow_observer_count.clone();
1580
1581 // Add observer that simulates slow work
1582 let _id = match prop.subscribe(Arc::new(move |_, _| {
1583 thread::sleep(Duration::from_millis(50));
1584 count_clone.fetch_add(1, Ordering::SeqCst);
1585 })) {
1586 Ok(id) => id,
1587 Err(e) => panic!("Failed to subscribe slow observer: {}", e),
1588 };
1589
1590 // Test synchronous set (should be slow)
1591 let start = std::time::Instant::now();
1592 if let Err(e) = prop.set(1) {
1593 panic!("Failed to set property value synchronously: {}", e);
1594 }
1595 let sync_duration = start.elapsed();
1596
1597 // Test asynchronous set (should be fast)
1598 let start = std::time::Instant::now();
1599 if let Err(e) = prop.set_async(2) {
1600 panic!("Failed to set property value asynchronously: {}", e);
1601 }
1602 let async_duration = start.elapsed();
1603
1604 // Async should be much faster than sync
1605 assert!(async_duration < sync_duration);
1606 assert!(async_duration.as_millis() < 10); // Should be very fast
1607
1608 // Wait for async observer to complete
1609 thread::sleep(Duration::from_millis(100));
1610
1611 // Both observers should have been called
1612 assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
1613 }
1614
1615 #[test]
1616 fn test_lock_poisoning() {
1617 // Create a property that we'll poison
1618 let prop = Arc::new(ObservableProperty::new(0));
1619 let prop_clone = prop.clone();
1620
1621 // Create a thread that will deliberately poison the lock
1622 let poison_thread = thread::spawn(move || {
1623 // Get write lock and then panic, which will poison the lock
1624 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1625 panic!("Deliberate panic to poison the lock");
1626 });
1627
1628 // Wait for the thread to complete (it will panic)
1629 let _ = poison_thread.join();
1630
1631 // Now the lock should be poisoned, verify all operations return appropriate errors
1632 match prop.get() {
1633 Ok(_) => panic!("get() should fail on a poisoned lock"),
1634 Err(e) => match e {
1635 PropertyError::PoisonedLock => {} // Expected error
1636 _ => panic!("Expected PoisonedLock error, got: {:?}", e),
1637 },
1638 }
1639
1640 match prop.set(42) {
1641 Ok(_) => panic!("set() should fail on a poisoned lock"),
1642 Err(e) => match e {
1643 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1644 _ => panic!("Expected lock-related error, got: {:?}", e),
1645 },
1646 }
1647
1648 match prop.subscribe(Arc::new(|_, _| {})) {
1649 Ok(_) => panic!("subscribe() should fail on a poisoned lock"),
1650 Err(e) => match e {
1651 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
1652 _ => panic!("Expected lock-related error, got: {:?}", e),
1653 },
1654 }
1655 }
1656
1657 #[test]
1658 fn test_observer_panic_isolation() {
1659 let prop = ObservableProperty::new(0);
1660 let call_counts = Arc::new(AtomicUsize::new(0));
1661
1662 // First observer will panic
1663 let panic_observer_id = prop
1664 .subscribe(Arc::new(|_, _| {
1665 panic!("This observer deliberately panics");
1666 }))
1667 .expect("Failed to subscribe panic observer");
1668
1669 // Second observer should still be called despite first one panicking
1670 let counts = call_counts.clone();
1671 let normal_observer_id = prop
1672 .subscribe(Arc::new(move |_, _| {
1673 counts.fetch_add(1, Ordering::SeqCst);
1674 }))
1675 .expect("Failed to subscribe normal observer");
1676
1677 // Trigger the observers - this shouldn't panic despite the first observer panicking
1678 prop.set(42).expect("Failed to set property value");
1679
1680 // Verify the second observer was still called
1681 assert_eq!(call_counts.load(Ordering::SeqCst), 1);
1682
1683 // Clean up
1684 prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
1685 prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
1686 }
1687
1688 #[test]
1689 fn test_unsubscribe_nonexistent_observer() {
1690 let property = ObservableProperty::new(0);
1691
1692 // Generate a valid observer ID
1693 let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
1694
1695 // Create an ID that doesn't exist (valid_id + 1000 should not exist)
1696 let nonexistent_id = valid_id + 1000;
1697
1698 // Test unsubscribing a nonexistent observer
1699 match property.unsubscribe(nonexistent_id) {
1700 Ok(was_present) => {
1701 assert!(
1702 !was_present,
1703 "Unsubscribe should return false for nonexistent ID"
1704 );
1705 }
1706 Err(e) => panic!("Unsubscribe returned error: {:?}", e),
1707 }
1708
1709 // Also verify that unsubscribing twice returns false the second time
1710 property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
1711
1712 let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
1713 assert!(!result, "Second unsubscribe should return false");
1714 }
1715
1716 #[test]
1717 fn test_observer_id_wraparound() {
1718 let prop = ObservableProperty::new(0);
1719
1720 // Test that observer IDs increment properly and don't wrap around unexpectedly
1721 let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
1722 let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
1723 let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
1724
1725 assert!(id2 > id1, "Observer IDs should increment");
1726 assert!(id3 > id2, "Observer IDs should continue incrementing");
1727 assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
1728 assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
1729
1730 // Clean up
1731 prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
1732 prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
1733 prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
1734 }
1735
1736 #[test]
1737 fn test_concurrent_subscribe_unsubscribe() {
1738 let prop = Arc::new(ObservableProperty::new(0));
1739 let num_threads = 8;
1740 let operations_per_thread = 100;
1741
1742 let handles: Vec<_> = (0..num_threads)
1743 .map(|thread_id| {
1744 let prop_clone = prop.clone();
1745 thread::spawn(move || {
1746 let mut local_ids = Vec::new();
1747
1748 for i in 0..operations_per_thread {
1749 // Subscribe an observer
1750 let observer_id = prop_clone
1751 .subscribe(Arc::new(move |old, new| {
1752 // Do some work to simulate real observer
1753 let _ = thread_id + i + old + new;
1754 }))
1755 .expect("Subscribe should succeed");
1756
1757 local_ids.push(observer_id);
1758
1759 // Occasionally unsubscribe some observers
1760 if i % 10 == 0 && !local_ids.is_empty() {
1761 let idx = i % local_ids.len();
1762 let id_to_remove = local_ids.remove(idx);
1763 prop_clone
1764 .unsubscribe(id_to_remove)
1765 .expect("Unsubscribe should succeed");
1766 }
1767 }
1768
1769 // Clean up remaining observers
1770 for id in local_ids {
1771 prop_clone
1772 .unsubscribe(id)
1773 .expect("Final cleanup should succeed");
1774 }
1775 })
1776 })
1777 .collect();
1778
1779 // Wait for all threads to complete
1780 for handle in handles {
1781 handle.join().expect("Thread should complete successfully");
1782 }
1783
1784 // Property should still be functional
1785 prop.set(42)
1786 .expect("Property should still work after concurrent operations");
1787 }
1788
1789 #[test]
1790 fn test_multiple_observer_panics_isolation() {
1791 let prop = ObservableProperty::new(0);
1792 let successful_calls = Arc::new(AtomicUsize::new(0));
1793
1794 // Create multiple observers that will panic
1795 let _panic_id1 = prop
1796 .subscribe(Arc::new(|_, _| {
1797 panic!("First panic observer");
1798 }))
1799 .expect("Failed to subscribe first panic observer");
1800
1801 let _panic_id2 = prop
1802 .subscribe(Arc::new(|_, _| {
1803 panic!("Second panic observer");
1804 }))
1805 .expect("Failed to subscribe second panic observer");
1806
1807 // Create observers that should succeed despite the panics
1808 let count1 = successful_calls.clone();
1809 let _success_id1 = prop
1810 .subscribe(Arc::new(move |_, _| {
1811 count1.fetch_add(1, Ordering::SeqCst);
1812 }))
1813 .expect("Failed to subscribe first success observer");
1814
1815 let count2 = successful_calls.clone();
1816 let _success_id2 = prop
1817 .subscribe(Arc::new(move |_, _| {
1818 count2.fetch_add(1, Ordering::SeqCst);
1819 }))
1820 .expect("Failed to subscribe second success observer");
1821
1822 // Trigger all observers
1823 prop.set(42).expect("Failed to set property value for panic isolation test");
1824
1825 // Both successful observers should have been called despite the panics
1826 assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
1827 }
1828
1829 #[test]
1830 fn test_async_observer_panic_isolation() {
1831 let prop = ObservableProperty::new(0);
1832 let successful_calls = Arc::new(AtomicUsize::new(0));
1833
1834 // Create observer that will panic
1835 let _panic_id = prop
1836 .subscribe(Arc::new(|_, _| {
1837 panic!("Async panic observer");
1838 }))
1839 .expect("Failed to subscribe async panic observer");
1840
1841 // Create observer that should succeed
1842 let count = successful_calls.clone();
1843 let _success_id = prop
1844 .subscribe(Arc::new(move |_, _| {
1845 count.fetch_add(1, Ordering::SeqCst);
1846 }))
1847 .expect("Failed to subscribe async success observer");
1848
1849 // Use async set to trigger observers in background threads
1850 prop.set_async(42).expect("Failed to set property value asynchronously");
1851
1852 // Wait for async observers to complete
1853 thread::sleep(Duration::from_millis(100));
1854
1855 // The successful observer should have been called despite the panic
1856 assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
1857 }
1858
1859 #[test]
1860 fn test_very_large_observer_count() {
1861 let prop = ObservableProperty::new(0);
1862 let total_calls = Arc::new(AtomicUsize::new(0));
1863 let observer_count = 1000;
1864
1865 // Subscribe many observers
1866 let mut observer_ids = Vec::with_capacity(observer_count);
1867 for i in 0..observer_count {
1868 let count = total_calls.clone();
1869 let id = prop
1870 .subscribe(Arc::new(move |old, new| {
1871 count.fetch_add(1, Ordering::SeqCst);
1872 // Verify we got the right values
1873 assert_eq!(*old, 0);
1874 assert_eq!(*new, i + 1);
1875 }))
1876 .expect("Failed to subscribe large observer count test observer");
1877 observer_ids.push(id);
1878 }
1879
1880 // Trigger all observers
1881 prop.set(observer_count).expect("Failed to set property value for large observer count test");
1882
1883 // All observers should have been called
1884 assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
1885
1886 // Clean up
1887 for id in observer_ids {
1888 prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
1889 }
1890 }
1891
1892 #[test]
1893 fn test_observer_with_mutable_state() {
1894 let prop = ObservableProperty::new(0);
1895 let call_history = Arc::new(RwLock::new(Vec::new()));
1896
1897 let history = call_history.clone();
1898 let observer_id = prop
1899 .subscribe(Arc::new(move |old, new| {
1900 if let Ok(mut hist) = history.write() {
1901 hist.push((*old, *new));
1902 }
1903 }))
1904 .expect("Failed to subscribe mutable state observer");
1905
1906 // Make several changes
1907 prop.set(1).expect("Failed to set property to 1");
1908 prop.set(2).expect("Failed to set property to 2");
1909 prop.set(3).expect("Failed to set property to 3");
1910
1911 // Verify the history was recorded correctly
1912 let history = call_history.read().expect("Failed to read call history");
1913 assert_eq!(history.len(), 3);
1914 assert_eq!(history[0], (0, 1));
1915 assert_eq!(history[1], (1, 2));
1916 assert_eq!(history[2], (2, 3));
1917
1918 prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
1919 }
1920
1921 #[test]
1922 fn test_subscription_automatic_cleanup() {
1923 let prop = ObservableProperty::new(0);
1924 let call_count = Arc::new(AtomicUsize::new(0));
1925
1926 // Test that subscription automatically cleans up when dropped
1927 {
1928 let count = call_count.clone();
1929 let _subscription = prop
1930 .subscribe_with_subscription(Arc::new(move |_, _| {
1931 count.fetch_add(1, Ordering::SeqCst);
1932 }))
1933 .expect("Failed to create subscription for automatic cleanup test");
1934
1935 // Observer should be active while subscription is in scope
1936 prop.set(1).expect("Failed to set property value in subscription test");
1937 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1938
1939 // Subscription goes out of scope here and should auto-cleanup
1940 }
1941
1942 // Observer should no longer be active after subscription dropped
1943 prop.set(2).expect("Failed to set property value after subscription dropped");
1944 assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
1945 }
1946
1947 #[test]
1948 fn test_subscription_explicit_drop() {
1949 let prop = ObservableProperty::new(0);
1950 let call_count = Arc::new(AtomicUsize::new(0));
1951
1952 let count = call_count.clone();
1953 let subscription = prop
1954 .subscribe_with_subscription(Arc::new(move |_, _| {
1955 count.fetch_add(1, Ordering::SeqCst);
1956 }))
1957 .expect("Failed to create subscription for explicit drop test");
1958
1959 // Observer should be active
1960 prop.set(1).expect("Failed to set property value before explicit drop");
1961 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1962
1963 // Explicitly drop the subscription
1964 drop(subscription);
1965
1966 // Observer should no longer be active
1967 prop.set(2).expect("Failed to set property value after explicit drop");
1968 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1969 }
1970
1971 #[test]
1972 fn test_multiple_subscriptions_with_cleanup() {
1973 let prop = ObservableProperty::new(0);
1974 let call_count1 = Arc::new(AtomicUsize::new(0));
1975 let call_count2 = Arc::new(AtomicUsize::new(0));
1976 let call_count3 = Arc::new(AtomicUsize::new(0));
1977
1978 let count1 = call_count1.clone();
1979 let count2 = call_count2.clone();
1980 let count3 = call_count3.clone();
1981
1982 let subscription1 = prop
1983 .subscribe_with_subscription(Arc::new(move |_, _| {
1984 count1.fetch_add(1, Ordering::SeqCst);
1985 }))
1986 .expect("Failed to create first subscription");
1987
1988 let subscription2 = prop
1989 .subscribe_with_subscription(Arc::new(move |_, _| {
1990 count2.fetch_add(1, Ordering::SeqCst);
1991 }))
1992 .expect("Failed to create second subscription");
1993
1994 let subscription3 = prop
1995 .subscribe_with_subscription(Arc::new(move |_, _| {
1996 count3.fetch_add(1, Ordering::SeqCst);
1997 }))
1998 .expect("Failed to create third subscription");
1999
2000 // All observers should be active
2001 prop.set(1).expect("Failed to set property value with all subscriptions");
2002 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
2003 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2004 assert_eq!(call_count3.load(Ordering::SeqCst), 1);
2005
2006 // Drop second subscription
2007 drop(subscription2);
2008
2009 // Only first and third should be active
2010 prop.set(2).expect("Failed to set property value with partial subscriptions");
2011 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2012 assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
2013 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2014
2015 // Drop remaining subscriptions
2016 drop(subscription1);
2017 drop(subscription3);
2018
2019 // No observers should be active
2020 prop.set(3).expect("Failed to set property value with no subscriptions");
2021 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2022 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2023 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2024 }
2025
2026 #[test]
2027 fn test_subscription_drop_with_poisoned_lock() {
2028 let prop = Arc::new(ObservableProperty::new(0));
2029 let prop_clone = prop.clone();
2030
2031 // Create a subscription
2032 let call_count = Arc::new(AtomicUsize::new(0));
2033 let count = call_count.clone();
2034 let subscription = prop
2035 .subscribe_with_subscription(Arc::new(move |_, _| {
2036 count.fetch_add(1, Ordering::SeqCst);
2037 }))
2038 .expect("Failed to create subscription for poisoned lock test");
2039
2040 // Poison the lock by panicking while holding a write lock
2041 let poison_thread = thread::spawn(move || {
2042 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2043 panic!("Deliberate panic to poison the lock");
2044 });
2045 let _ = poison_thread.join(); // Ignore the panic result
2046
2047 // Dropping the subscription should not panic even with poisoned lock
2048 // This tests that the Drop implementation handles poisoned locks gracefully
2049 drop(subscription); // Should complete without panic
2050
2051 // Test passes if we reach here without panicking
2052 }
2053
2054 #[test]
2055 fn test_subscription_vs_manual_unsubscribe() {
2056 let prop = ObservableProperty::new(0);
2057 let auto_count = Arc::new(AtomicUsize::new(0));
2058 let manual_count = Arc::new(AtomicUsize::new(0));
2059
2060 // Manual subscription
2061 let manual_count_clone = manual_count.clone();
2062 let manual_id = prop
2063 .subscribe(Arc::new(move |_, _| {
2064 manual_count_clone.fetch_add(1, Ordering::SeqCst);
2065 }))
2066 .expect("Failed to create manual subscription");
2067
2068 // Automatic subscription
2069 let auto_count_clone = auto_count.clone();
2070 let _auto_subscription = prop
2071 .subscribe_with_subscription(Arc::new(move |_, _| {
2072 auto_count_clone.fetch_add(1, Ordering::SeqCst);
2073 }))
2074 .expect("Failed to create automatic subscription");
2075
2076 // Both should be active
2077 prop.set(1).expect("Failed to set property value with both subscriptions");
2078 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2079 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2080
2081 // Manual unsubscribe
2082 prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2083
2084 // Only automatic subscription should be active
2085 prop.set(2).expect("Failed to set property value after manual unsubscribe");
2086 assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2087 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2088
2089 // Auto subscription goes out of scope here and cleans up automatically
2090 }
2091
2092 #[test]
2093 fn test_subscribe_with_subscription_error_handling() {
2094 let prop = Arc::new(ObservableProperty::new(0));
2095 let prop_clone = prop.clone();
2096
2097 // Poison the lock
2098 let poison_thread = thread::spawn(move || {
2099 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2100 panic!("Deliberate panic to poison the lock");
2101 });
2102 let _ = poison_thread.join();
2103
2104 // subscribe_with_subscription should return an error for poisoned lock
2105 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2106 assert!(result.is_err());
2107 match result.expect_err("Expected error for poisoned lock") {
2108 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {
2109 // Either error type is acceptable for poisoned lock
2110 }
2111 other => panic!("Unexpected error type: {:?}", other),
2112 }
2113 }
2114
2115 #[test]
2116 fn test_subscription_with_property_cloning() {
2117 let prop1 = ObservableProperty::new(0);
2118 let prop2 = prop1.clone();
2119 let call_count = Arc::new(AtomicUsize::new(0));
2120
2121 // Subscribe to prop1
2122 let count = call_count.clone();
2123 let _subscription = prop1
2124 .subscribe_with_subscription(Arc::new(move |_, _| {
2125 count.fetch_add(1, Ordering::SeqCst);
2126 }))
2127 .expect("Failed to create subscription for cloned property test");
2128
2129 // Changes through prop2 should trigger the observer subscribed to prop1
2130 prop2.set(42).expect("Failed to set property value through prop2");
2131 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2132
2133 // Changes through prop1 should also trigger the observer
2134 prop1.set(100).expect("Failed to set property value through prop1");
2135 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2136 }
2137
2138 #[test]
2139 fn test_subscription_in_conditional_blocks() {
2140 let prop = ObservableProperty::new(0);
2141 let call_count = Arc::new(AtomicUsize::new(0));
2142
2143 let should_subscribe = true;
2144
2145 if should_subscribe {
2146 let count = call_count.clone();
2147 let _subscription = prop
2148 .subscribe_with_subscription(Arc::new(move |_, _| {
2149 count.fetch_add(1, Ordering::SeqCst);
2150 }))
2151 .expect("Failed to create subscription in conditional block");
2152
2153 // Observer active within this block
2154 prop.set(1).expect("Failed to set property value in conditional block");
2155 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2156
2157 // Subscription dropped when exiting this block
2158 }
2159
2160 // Observer should be inactive now
2161 prop.set(2).expect("Failed to set property value after conditional block");
2162 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2163 }
2164
2165 #[test]
2166 fn test_subscription_with_early_return() {
2167 fn test_function(
2168 prop: &ObservableProperty<i32>,
2169 should_return_early: bool,
2170 ) -> Result<(), PropertyError> {
2171 let call_count = Arc::new(AtomicUsize::new(0));
2172 let count = call_count.clone();
2173
2174 let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2175 count.fetch_add(1, Ordering::SeqCst);
2176 }))?;
2177
2178 prop.set(1)?;
2179 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2180
2181 if should_return_early {
2182 return Ok(()); // Subscription should be cleaned up here
2183 }
2184
2185 prop.set(2)?;
2186 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2187
2188 Ok(())
2189 // Subscription cleaned up when function exits normally
2190 }
2191
2192 let prop = ObservableProperty::new(0);
2193
2194 // Test early return
2195 test_function(&prop, true).expect("Failed to test early return");
2196
2197 // Verify observer is no longer active after early return
2198 prop.set(10).expect("Failed to set property value after early return");
2199
2200 // Test normal exit
2201 test_function(&prop, false).expect("Failed to test normal exit");
2202
2203 // Verify observer is no longer active after normal exit
2204 prop.set(20).expect("Failed to set property value after normal exit");
2205 }
2206
2207 #[test]
2208 fn test_subscription_move_semantics() {
2209 let prop = ObservableProperty::new(0);
2210 let call_count = Arc::new(AtomicUsize::new(0));
2211
2212 let count = call_count.clone();
2213 let subscription = prop
2214 .subscribe_with_subscription(Arc::new(move |_, _| {
2215 count.fetch_add(1, Ordering::SeqCst);
2216 }))
2217 .expect("Failed to create subscription for move semantics test");
2218
2219 // Observer should be active
2220 prop.set(1).expect("Failed to set property value before move");
2221 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2222
2223 // Move subscription to a new variable
2224 let moved_subscription = subscription;
2225
2226 // Observer should still be active after move
2227 prop.set(2).expect("Failed to set property value after move");
2228 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2229
2230 // Drop the moved subscription
2231 drop(moved_subscription);
2232
2233 // Observer should now be inactive
2234 prop.set(3).expect("Failed to set property value after moved subscription drop");
2235 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2236 }
2237
2238 #[test]
2239 fn test_filtered_subscription_automatic_cleanup() {
2240 let prop = ObservableProperty::new(0);
2241 let call_count = Arc::new(AtomicUsize::new(0));
2242
2243 {
2244 let count = call_count.clone();
2245 let _subscription = prop
2246 .subscribe_filtered_with_subscription(
2247 Arc::new(move |_, _| {
2248 count.fetch_add(1, Ordering::SeqCst);
2249 }),
2250 |old, new| new > old, // Only trigger on increases
2251 )
2252 .expect("Failed to create filtered subscription");
2253
2254 // Should trigger (0 -> 5)
2255 prop.set(5).expect("Failed to set property value to 5 in filtered test");
2256 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2257
2258 // Should NOT trigger (5 -> 3)
2259 prop.set(3).expect("Failed to set property value to 3 in filtered test");
2260 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2261
2262 // Should trigger (3 -> 10)
2263 prop.set(10).expect("Failed to set property value to 10 in filtered test");
2264 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2265
2266 // Subscription goes out of scope here
2267 }
2268
2269 // Observer should be inactive after subscription cleanup
2270 prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2271 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2272 }
2273
2274 #[test]
2275 fn test_multiple_filtered_subscriptions() {
2276 let prop = ObservableProperty::new(10);
2277 let increase_count = Arc::new(AtomicUsize::new(0));
2278 let decrease_count = Arc::new(AtomicUsize::new(0));
2279 let significant_change_count = Arc::new(AtomicUsize::new(0));
2280
2281 let inc_count = increase_count.clone();
2282 let dec_count = decrease_count.clone();
2283 let sig_count = significant_change_count.clone();
2284
2285 let _increase_sub = prop
2286 .subscribe_filtered_with_subscription(
2287 Arc::new(move |_, _| {
2288 inc_count.fetch_add(1, Ordering::SeqCst);
2289 }),
2290 |old, new| new > old,
2291 )
2292 .expect("Failed to create increase subscription");
2293
2294 let _decrease_sub = prop
2295 .subscribe_filtered_with_subscription(
2296 Arc::new(move |_, _| {
2297 dec_count.fetch_add(1, Ordering::SeqCst);
2298 }),
2299 |old, new| new < old,
2300 )
2301 .expect("Failed to create decrease subscription");
2302
2303 let _significant_sub = prop
2304 .subscribe_filtered_with_subscription(
2305 Arc::new(move |_, _| {
2306 sig_count.fetch_add(1, Ordering::SeqCst);
2307 }),
2308 |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2309 )
2310 .expect("Failed to create significant change subscription");
2311
2312 // Test increases
2313 prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2314 assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2315 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2316 assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2317
2318 // Test significant increase
2319 prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2320 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2321 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2322 assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2323
2324 // Test significant decrease
2325 prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2326 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2327 assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2328 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2329
2330 // Test small decrease
2331 prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2332 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2333 assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2334 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2335
2336 // All subscriptions auto-cleanup when they go out of scope
2337 }
2338
2339 #[test]
2340 fn test_filtered_subscription_complex_filter() {
2341 let prop = ObservableProperty::new(0.0f64);
2342 let call_count = Arc::new(AtomicUsize::new(0));
2343 let values_received = Arc::new(RwLock::new(Vec::new()));
2344
2345 let count = call_count.clone();
2346 let values = values_received.clone();
2347 let _subscription = prop
2348 .subscribe_filtered_with_subscription(
2349 Arc::new(move |old, new| {
2350 count.fetch_add(1, Ordering::SeqCst);
2351 if let Ok(mut v) = values.write() {
2352 v.push((*old, *new));
2353 }
2354 }),
2355 |old, new| {
2356 // Complex filter: trigger only when crossing integer boundaries
2357 // and the change is significant (> 0.5)
2358 let old_int = old.floor() as i32;
2359 let new_int = new.floor() as i32;
2360 old_int != new_int && (new - old).abs() > 0.5_f64
2361 },
2362 )
2363 .expect("Failed to create complex filtered subscription");
2364
2365 // Small changes within same integer - should not trigger
2366 prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2367 prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2368 assert_eq!(call_count.load(Ordering::SeqCst), 0);
2369
2370 // Cross integer boundary with significant change - should trigger
2371 prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2372 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2373
2374 // Small cross-boundary change - should not trigger
2375 prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2376 prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2377 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2378
2379 // Large cross-boundary change - should trigger
2380 prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2381 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2382
2383 // Verify received values
2384 let values = values_received.read().expect("Failed to read values in complex filter test");
2385 assert_eq!(values.len(), 2);
2386 assert_eq!(values[0], (0.7, 1.3));
2387 assert_eq!(values[1], (2.1, 3.5));
2388 }
2389
2390 #[test]
2391 fn test_filtered_subscription_error_handling() {
2392 let prop = Arc::new(ObservableProperty::new(0));
2393 let prop_clone = prop.clone();
2394
2395 // Poison the lock
2396 let poison_thread = thread::spawn(move || {
2397 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2398 panic!("Deliberate panic to poison the lock");
2399 });
2400 let _ = poison_thread.join();
2401
2402 // subscribe_filtered_with_subscription should return error for poisoned lock
2403 let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2404 assert!(result.is_err());
2405 }
2406
2407 #[test]
2408 fn test_filtered_subscription_vs_manual_filtered() {
2409 let prop = ObservableProperty::new(0);
2410 let auto_count = Arc::new(AtomicUsize::new(0));
2411 let manual_count = Arc::new(AtomicUsize::new(0));
2412
2413 // Manual filtered subscription
2414 let manual_count_clone = manual_count.clone();
2415 let manual_id = prop
2416 .subscribe_filtered(
2417 Arc::new(move |_, _| {
2418 manual_count_clone.fetch_add(1, Ordering::SeqCst);
2419 }),
2420 |old, new| new > old,
2421 )
2422 .expect("Failed to create manual filtered subscription");
2423
2424 // Automatic filtered subscription
2425 let auto_count_clone = auto_count.clone();
2426 let _auto_subscription = prop
2427 .subscribe_filtered_with_subscription(
2428 Arc::new(move |_, _| {
2429 auto_count_clone.fetch_add(1, Ordering::SeqCst);
2430 }),
2431 |old, new| new > old,
2432 )
2433 .expect("Failed to create automatic filtered subscription");
2434
2435 // Both should be triggered by increases
2436 prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2437 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2438 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2439
2440 // Neither should be triggered by decreases
2441 prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2442 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2443 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2444
2445 // Both should be triggered by increases again
2446 prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2447 assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2448 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2449
2450 // Manual cleanup
2451 prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2452
2453 // Only automatic subscription should be active
2454 prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2455 assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2456 assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2457
2458 // Auto subscription cleaned up when it goes out of scope
2459 }
2460
2461 #[test]
2462 fn test_filtered_subscription_with_panicking_filter() {
2463 let prop = ObservableProperty::new(0);
2464 let call_count = Arc::new(AtomicUsize::new(0));
2465
2466 let count = call_count.clone();
2467 let _subscription = prop
2468 .subscribe_filtered_with_subscription(
2469 Arc::new(move |_, _| {
2470 count.fetch_add(1, Ordering::SeqCst);
2471 }),
2472 |_, new| {
2473 if *new == 42 {
2474 panic!("Filter panic on 42");
2475 }
2476 true // Accept all other values
2477 },
2478 )
2479 .expect("Failed to create panicking filter subscription");
2480
2481 // Normal value should work
2482 prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2483 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2484
2485 // Value that causes filter to panic should be handled gracefully
2486 // The behavior here depends on how the filter panic is handled
2487 // In the current implementation, filter panics may cause the observer to not be called
2488 prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2489
2490 // Observer should still work for subsequent normal values
2491 prop.set(2).expect("Failed to set property to 2 after filter panic");
2492 // Note: The exact count here depends on panic handling implementation
2493 // The important thing is that the system doesn't crash
2494 }
2495
2496 #[test]
2497 fn test_subscription_thread_safety() {
2498 let prop = Arc::new(ObservableProperty::new(0));
2499 let num_threads = 8;
2500 let operations_per_thread = 50;
2501 let total_calls = Arc::new(AtomicUsize::new(0));
2502
2503 let handles: Vec<_> = (0..num_threads)
2504 .map(|thread_id| {
2505 let prop_clone = prop.clone();
2506 let calls_clone = total_calls.clone();
2507
2508 thread::spawn(move || {
2509 let mut local_subscriptions = Vec::new();
2510
2511 for i in 0..operations_per_thread {
2512 let calls = calls_clone.clone();
2513 let subscription = prop_clone
2514 .subscribe_with_subscription(Arc::new(move |old, new| {
2515 calls.fetch_add(1, Ordering::SeqCst);
2516 // Simulate some work
2517 let _ = thread_id + i + old + new;
2518 }))
2519 .expect("Should be able to create subscription");
2520
2521 local_subscriptions.push(subscription);
2522
2523 // Trigger observers
2524 prop_clone
2525 .set(thread_id * 1000 + i)
2526 .expect("Should be able to set value");
2527
2528 // Occasionally drop some subscriptions
2529 if i % 5 == 0 && !local_subscriptions.is_empty() {
2530 local_subscriptions.remove(0); // Drop first subscription
2531 }
2532 }
2533
2534 // All remaining subscriptions dropped when vector goes out of scope
2535 })
2536 })
2537 .collect();
2538
2539 // Wait for all threads to complete
2540 for handle in handles {
2541 handle.join().expect("Thread should complete successfully");
2542 }
2543
2544 // Property should still be functional after all the concurrent operations
2545 prop.set(9999).expect("Property should still work");
2546
2547 // We can't easily verify the exact call count due to the complex timing,
2548 // but we can verify that the system didn't crash and is still operational
2549 println!(
2550 "Total observer calls: {}",
2551 total_calls.load(Ordering::SeqCst)
2552 );
2553 }
2554
2555 #[test]
2556 fn test_subscription_cross_thread_drop() {
2557 let prop = Arc::new(ObservableProperty::new(0));
2558 let call_count = Arc::new(AtomicUsize::new(0));
2559
2560 // Create subscription in main thread
2561 let count = call_count.clone();
2562 let subscription = prop
2563 .subscribe_with_subscription(Arc::new(move |_, _| {
2564 count.fetch_add(1, Ordering::SeqCst);
2565 }))
2566 .expect("Failed to create subscription for cross-thread drop test");
2567
2568 // Verify observer is active
2569 prop.set(1).expect("Failed to set property value in cross-thread drop test");
2570 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2571
2572 // Move subscription to another thread and drop it there
2573 let prop_clone = prop.clone();
2574 let call_count_clone = call_count.clone();
2575
2576 let handle = thread::spawn(move || {
2577 // Verify observer is still active in the other thread
2578 prop_clone.set(2).expect("Failed to set property value in other thread");
2579 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
2580
2581 // Drop subscription in this thread
2582 drop(subscription);
2583
2584 // Verify observer is no longer active
2585 prop_clone.set(3).expect("Failed to set property value after drop in other thread");
2586 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
2587 });
2588
2589 handle.join().expect("Failed to join cross-thread drop test thread");
2590
2591 // Verify observer is still inactive in main thread
2592 prop.set(4).expect("Failed to set property value after thread join");
2593 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2594 }
2595
2596 #[test]
2597 fn test_concurrent_subscription_creation_and_property_changes() {
2598 let prop = Arc::new(ObservableProperty::new(0));
2599 let total_notifications = Arc::new(AtomicUsize::new(0));
2600 let num_subscriber_threads = 4;
2601 let num_setter_threads = 2;
2602 let operations_per_thread = 25;
2603
2604 // Threads that create and destroy subscriptions
2605 let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
2606 .map(|_| {
2607 let prop_clone = prop.clone();
2608 let notifications_clone = total_notifications.clone();
2609
2610 thread::spawn(move || {
2611 for _ in 0..operations_per_thread {
2612 let notifications = notifications_clone.clone();
2613 let _subscription = prop_clone
2614 .subscribe_with_subscription(Arc::new(move |_, _| {
2615 notifications.fetch_add(1, Ordering::SeqCst);
2616 }))
2617 .expect("Should create subscription");
2618
2619 // Keep subscription alive for a short time
2620 thread::sleep(Duration::from_millis(1));
2621
2622 // Subscription dropped when _subscription goes out of scope
2623 }
2624 })
2625 })
2626 .collect();
2627
2628 // Threads that continuously change the property value
2629 let setter_handles: Vec<_> = (0..num_setter_threads)
2630 .map(|thread_id| {
2631 let prop_clone = prop.clone();
2632
2633 thread::spawn(move || {
2634 for i in 0..operations_per_thread * 2 {
2635 prop_clone
2636 .set(thread_id * 10000 + i)
2637 .expect("Should set value");
2638 thread::sleep(Duration::from_millis(1));
2639 }
2640 })
2641 })
2642 .collect();
2643
2644 // Wait for all threads to complete
2645 for handle in subscriber_handles
2646 .into_iter()
2647 .chain(setter_handles.into_iter())
2648 {
2649 handle.join().expect("Thread should complete");
2650 }
2651
2652 // System should be stable after concurrent operations
2653 prop.set(99999).expect("Property should still work");
2654
2655 println!(
2656 "Total notifications during concurrent test: {}",
2657 total_notifications.load(Ordering::SeqCst)
2658 );
2659 }
2660
2661 #[test]
2662 fn test_filtered_subscription_thread_safety() {
2663 let prop = Arc::new(ObservableProperty::new(0));
2664 let increase_notifications = Arc::new(AtomicUsize::new(0));
2665 let decrease_notifications = Arc::new(AtomicUsize::new(0));
2666 let num_threads = 6;
2667
2668 let handles: Vec<_> = (0..num_threads)
2669 .map(|thread_id| {
2670 let prop_clone = prop.clone();
2671 let inc_notifications = increase_notifications.clone();
2672 let dec_notifications = decrease_notifications.clone();
2673
2674 thread::spawn(move || {
2675 // Create increase-only subscription
2676 let inc_count = inc_notifications.clone();
2677 let _inc_subscription = prop_clone
2678 .subscribe_filtered_with_subscription(
2679 Arc::new(move |_, _| {
2680 inc_count.fetch_add(1, Ordering::SeqCst);
2681 }),
2682 |old, new| new > old,
2683 )
2684 .expect("Should create filtered subscription");
2685
2686 // Create decrease-only subscription
2687 let dec_count = dec_notifications.clone();
2688 let _dec_subscription = prop_clone
2689 .subscribe_filtered_with_subscription(
2690 Arc::new(move |_, _| {
2691 dec_count.fetch_add(1, Ordering::SeqCst);
2692 }),
2693 |old, new| new < old,
2694 )
2695 .expect("Should create filtered subscription");
2696
2697 // Perform some property changes
2698 let base_value = thread_id * 100;
2699 for i in 0..20 {
2700 let new_value = base_value + (i % 10); // Creates increases and decreases
2701 prop_clone.set(new_value).expect("Should set value");
2702 thread::sleep(Duration::from_millis(1));
2703 }
2704
2705 // Subscriptions automatically cleaned up when going out of scope
2706 })
2707 })
2708 .collect();
2709
2710 // Wait for all threads
2711 for handle in handles {
2712 handle.join().expect("Thread should complete");
2713 }
2714
2715 // Verify system is still operational
2716 let initial_inc = increase_notifications.load(Ordering::SeqCst);
2717 let initial_dec = decrease_notifications.load(Ordering::SeqCst);
2718
2719 prop.set(1000).expect("Property should still work");
2720 prop.set(2000).expect("Property should still work");
2721
2722 // No new notifications should occur (all subscriptions cleaned up)
2723 assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
2724 assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
2725
2726 println!(
2727 "Increase notifications: {}, Decrease notifications: {}",
2728 initial_inc, initial_dec
2729 );
2730 }
2731
2732 #[test]
2733 fn test_subscription_with_async_property_changes() {
2734 let prop = Arc::new(ObservableProperty::new(0));
2735 let sync_notifications = Arc::new(AtomicUsize::new(0));
2736 let async_notifications = Arc::new(AtomicUsize::new(0));
2737
2738 // Subscription that tracks sync notifications
2739 let sync_count = sync_notifications.clone();
2740 let _sync_subscription = prop
2741 .subscribe_with_subscription(Arc::new(move |old, new| {
2742 sync_count.fetch_add(1, Ordering::SeqCst);
2743 // Simulate slow observer work
2744 thread::sleep(Duration::from_millis(5));
2745 println!("Sync observer: {} -> {}", old, new);
2746 }))
2747 .expect("Failed to create sync subscription");
2748
2749 // Subscription that tracks async notifications
2750 let async_count = async_notifications.clone();
2751 let _async_subscription = prop
2752 .subscribe_with_subscription(Arc::new(move |old, new| {
2753 async_count.fetch_add(1, Ordering::SeqCst);
2754 println!("Async observer: {} -> {}", old, new);
2755 }))
2756 .expect("Failed to create async subscription");
2757
2758 // Test sync property changes
2759 let start = std::time::Instant::now();
2760 prop.set(1).expect("Failed to set property value 1 in async test");
2761 prop.set(2).expect("Failed to set property value 2 in async test");
2762 let sync_duration = start.elapsed();
2763
2764 // Test async property changes
2765 let start = std::time::Instant::now();
2766 prop.set_async(3).expect("Failed to set property value 3 async");
2767 prop.set_async(4).expect("Failed to set property value 4 async");
2768 let async_duration = start.elapsed();
2769
2770 // Async should be much faster
2771 assert!(async_duration < sync_duration);
2772
2773 // Wait for async observers to complete
2774 thread::sleep(Duration::from_millis(50));
2775
2776 // All observers should have been called
2777 assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
2778 assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
2779
2780 // Subscriptions auto-cleanup when going out of scope
2781 }
2782
2783 #[test]
2784 fn test_subscription_creation_with_poisoned_lock() {
2785 let prop = Arc::new(ObservableProperty::new(0));
2786 let prop_clone = prop.clone();
2787
2788 // Create a valid subscription before poisoning
2789 let call_count = Arc::new(AtomicUsize::new(0));
2790 let count = call_count.clone();
2791 let existing_subscription = prop
2792 .subscribe_with_subscription(Arc::new(move |_, _| {
2793 count.fetch_add(1, Ordering::SeqCst);
2794 }))
2795 .expect("Failed to create subscription before poisoning");
2796
2797 // Poison the lock
2798 let poison_thread = thread::spawn(move || {
2799 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
2800 panic!("Deliberate panic to poison the lock");
2801 });
2802 let _ = poison_thread.join();
2803
2804 // New subscription creation should fail
2805 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2806 assert!(result.is_err());
2807
2808 // New filtered subscription creation should also fail
2809 let filtered_result =
2810 prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2811 assert!(filtered_result.is_err());
2812
2813 // Dropping existing subscription should not panic
2814 drop(existing_subscription);
2815 }
2816
2817 #[test]
2818 fn test_subscription_cleanup_behavior_with_poisoned_lock() {
2819 // This test specifically verifies that Drop doesn't panic with poisoned locks
2820 let prop = Arc::new(ObservableProperty::new(0));
2821 let call_count = Arc::new(AtomicUsize::new(0));
2822
2823 // Create subscription
2824 let count = call_count.clone();
2825 let subscription = prop
2826 .subscribe_with_subscription(Arc::new(move |_, _| {
2827 count.fetch_add(1, Ordering::SeqCst);
2828 }))
2829 .expect("Failed to create subscription for cleanup behavior test");
2830
2831 // Verify it works initially
2832 prop.set(1).expect("Failed to set property value in cleanup behavior test");
2833 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2834
2835 // Poison the lock from another thread
2836 let prop_clone = prop.clone();
2837 let poison_thread = thread::spawn(move || {
2838 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
2839 panic!("Deliberate panic to poison the lock");
2840 });
2841 let _ = poison_thread.join();
2842
2843 // Now drop the subscription - this should NOT panic
2844 // The Drop implementation should handle the poisoned lock gracefully
2845 drop(subscription);
2846
2847 // Test succeeds if we reach this point without panicking
2848 }
2849
2850 #[test]
2851 fn test_multiple_subscription_cleanup_with_poisoned_lock() {
2852 let prop = Arc::new(ObservableProperty::new(0));
2853 let mut subscriptions = Vec::new();
2854
2855 // Create multiple subscriptions
2856 for i in 0..5 {
2857 let call_count = Arc::new(AtomicUsize::new(0));
2858 let count = call_count.clone();
2859 let subscription = prop
2860 .subscribe_with_subscription(Arc::new(move |old, new| {
2861 count.fetch_add(1, Ordering::SeqCst);
2862 println!("Observer {}: {} -> {}", i, old, new);
2863 }))
2864 .expect("Failed to create subscription in multiple cleanup test");
2865 subscriptions.push(subscription);
2866 }
2867
2868 // Verify they all work
2869 prop.set(42).expect("Failed to set property value in multiple cleanup test");
2870
2871 // Poison the lock
2872 let prop_clone = prop.clone();
2873 let poison_thread = thread::spawn(move || {
2874 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
2875 panic!("Deliberate panic to poison the lock");
2876 });
2877 let _ = poison_thread.join();
2878
2879 // Drop all subscriptions - none should panic
2880 for subscription in subscriptions {
2881 drop(subscription);
2882 }
2883
2884 // Test succeeds if no panics occurred
2885 }
2886
2887 #[test]
2888 fn test_subscription_behavior_before_and_after_poison() {
2889 let prop = Arc::new(ObservableProperty::new(0));
2890 let before_poison_count = Arc::new(AtomicUsize::new(0));
2891 let after_poison_count = Arc::new(AtomicUsize::new(0));
2892
2893 // Create subscription before poisoning
2894 let before_count = before_poison_count.clone();
2895 let before_subscription = prop
2896 .subscribe_with_subscription(Arc::new(move |_, _| {
2897 before_count.fetch_add(1, Ordering::SeqCst);
2898 }))
2899 .expect("Failed to create subscription before poison test");
2900
2901 // Verify it works
2902 prop.set(1).expect("Failed to set property value before poison test");
2903 assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
2904
2905 // Poison the lock
2906 let prop_clone = prop.clone();
2907 let poison_thread = thread::spawn(move || {
2908 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
2909 panic!("Deliberate panic to poison the lock");
2910 });
2911 let _ = poison_thread.join();
2912
2913 // Try to create subscription after poisoning - should fail
2914 let after_count = after_poison_count.clone();
2915 let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2916 after_count.fetch_add(1, Ordering::SeqCst);
2917 }));
2918 assert!(after_result.is_err());
2919
2920 // Clean up the before-poison subscription - should not panic
2921 drop(before_subscription);
2922
2923 // Verify after-poison subscription was never created
2924 assert_eq!(after_poison_count.load(Ordering::SeqCst), 0);
2925 }
2926
2927 #[test]
2928 fn test_concurrent_subscription_drops_with_poison() {
2929 let prop = Arc::new(ObservableProperty::new(0));
2930 let num_subscriptions = 10;
2931 let mut subscriptions = Vec::new();
2932
2933 // Create multiple subscriptions
2934 for i in 0..num_subscriptions {
2935 let call_count = Arc::new(AtomicUsize::new(0));
2936 let count = call_count.clone();
2937 let subscription = prop
2938 .subscribe_with_subscription(Arc::new(move |_, _| {
2939 count.fetch_add(1, Ordering::SeqCst);
2940 println!("Observer {}", i);
2941 }))
2942 .expect("Failed to create subscription in concurrent drops test");
2943 subscriptions.push(subscription);
2944 }
2945
2946 // Poison the lock
2947 let prop_clone = prop.clone();
2948 let poison_thread = thread::spawn(move || {
2949 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
2950 panic!("Deliberate panic to poison the lock");
2951 });
2952 let _ = poison_thread.join();
2953
2954 // Drop subscriptions concurrently from multiple threads
2955 let handles: Vec<_> = subscriptions
2956 .into_iter()
2957 .enumerate()
2958 .map(|(i, subscription)| {
2959 thread::spawn(move || {
2960 // Add some randomness to timing
2961 thread::sleep(Duration::from_millis(i as u64 % 5));
2962 drop(subscription);
2963 println!("Dropped subscription {}", i);
2964 })
2965 })
2966 .collect();
2967
2968 // Wait for all drops to complete
2969 for handle in handles {
2970 handle
2971 .join()
2972 .expect("Drop thread should complete without panic");
2973 }
2974
2975 // Test succeeds if all threads completed successfully
2976 }
2977
2978 #[test]
2979 fn test_with_max_threads_creation() {
2980 // Test creation with various thread counts
2981 let prop1 = ObservableProperty::with_max_threads(42, 1);
2982 let prop2 = ObservableProperty::with_max_threads("test".to_string(), 8);
2983 let prop3 = ObservableProperty::with_max_threads(0.5_f64, 16);
2984
2985 // Verify initial values are correct
2986 assert_eq!(prop1.get().expect("Failed to get prop1 value"), 42);
2987 assert_eq!(prop2.get().expect("Failed to get prop2 value"), "test");
2988 assert_eq!(prop3.get().expect("Failed to get prop3 value"), 0.5);
2989 }
2990
2991 #[test]
2992 fn test_with_max_threads_zero_defaults_to_max_threads() {
2993 // Test that zero max_threads defaults to MAX_THREADS (4)
2994 let prop1 = ObservableProperty::with_max_threads(100, 0);
2995 let prop2 = ObservableProperty::new(100); // Uses default MAX_THREADS
2996
2997 // Both should have the same max_threads value
2998 // We can't directly access max_threads, but we can verify behavior is consistent
2999 assert_eq!(prop1.get().expect("Failed to get prop1 value"), 100);
3000 assert_eq!(prop2.get().expect("Failed to get prop2 value"), 100);
3001 }
3002
3003 #[test]
3004 fn test_with_max_threads_basic_functionality() {
3005 let prop = ObservableProperty::with_max_threads(0, 2);
3006 let call_count = Arc::new(AtomicUsize::new(0));
3007
3008 // Subscribe an observer
3009 let count = call_count.clone();
3010 let _subscription = prop
3011 .subscribe_with_subscription(Arc::new(move |old, new| {
3012 count.fetch_add(1, Ordering::SeqCst);
3013 assert_eq!(*old, 0);
3014 assert_eq!(*new, 42);
3015 }))
3016 .expect("Failed to create subscription for max_threads test");
3017
3018 // Test synchronous set
3019 prop.set(42).expect("Failed to set value synchronously");
3020 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3021
3022 // Test asynchronous set
3023 prop.set_async(43).expect("Failed to set value asynchronously");
3024
3025 // Wait for async observers to complete
3026 thread::sleep(Duration::from_millis(50));
3027 assert_eq!(call_count.load(Ordering::SeqCst), 2);
3028 }
3029
3030 #[test]
3031 fn test_with_max_threads_async_performance() {
3032 // Test that with_max_threads affects async performance
3033 let prop = ObservableProperty::with_max_threads(0, 1); // Single thread
3034 let slow_call_count = Arc::new(AtomicUsize::new(0));
3035
3036 // Add multiple slow observers
3037 let mut subscriptions = Vec::new();
3038 for _ in 0..4 {
3039 let count = slow_call_count.clone();
3040 let subscription = prop
3041 .subscribe_with_subscription(Arc::new(move |_, _| {
3042 thread::sleep(Duration::from_millis(25)); // Simulate slow work
3043 count.fetch_add(1, Ordering::SeqCst);
3044 }))
3045 .expect("Failed to create slow observer subscription");
3046 subscriptions.push(subscription);
3047 }
3048
3049 // Measure time for async notification
3050 let start = std::time::Instant::now();
3051 prop.set_async(42).expect("Failed to set value asynchronously");
3052 let async_duration = start.elapsed();
3053
3054 // Should return quickly even with slow observers
3055 assert!(async_duration.as_millis() < 50, "Async set should return quickly");
3056
3057 // Wait for all observers to complete
3058 thread::sleep(Duration::from_millis(200));
3059 assert_eq!(slow_call_count.load(Ordering::SeqCst), 4);
3060 }
3061
3062 #[test]
3063 fn test_with_max_threads_vs_regular_constructor() {
3064 let prop_regular = ObservableProperty::new(42);
3065 let prop_custom = ObservableProperty::with_max_threads(42, 4); // Same as default
3066
3067 let count_regular = Arc::new(AtomicUsize::new(0));
3068 let count_custom = Arc::new(AtomicUsize::new(0));
3069
3070 // Both should behave identically
3071 let count1 = count_regular.clone();
3072 let _sub1 = prop_regular
3073 .subscribe_with_subscription(Arc::new(move |_, _| {
3074 count1.fetch_add(1, Ordering::SeqCst);
3075 }))
3076 .expect("Failed to create regular subscription");
3077
3078 let count2 = count_custom.clone();
3079 let _sub2 = prop_custom
3080 .subscribe_with_subscription(Arc::new(move |_, _| {
3081 count2.fetch_add(1, Ordering::SeqCst);
3082 }))
3083 .expect("Failed to create custom subscription");
3084
3085 // Test sync behavior
3086 prop_regular.set(100).expect("Failed to set regular property");
3087 prop_custom.set(100).expect("Failed to set custom property");
3088
3089 assert_eq!(count_regular.load(Ordering::SeqCst), 1);
3090 assert_eq!(count_custom.load(Ordering::SeqCst), 1);
3091
3092 // Test async behavior
3093 prop_regular.set_async(200).expect("Failed to set regular property async");
3094 prop_custom.set_async(200).expect("Failed to set custom property async");
3095
3096 thread::sleep(Duration::from_millis(50));
3097 assert_eq!(count_regular.load(Ordering::SeqCst), 2);
3098 assert_eq!(count_custom.load(Ordering::SeqCst), 2);
3099 }
3100
3101 #[test]
3102 fn test_with_max_threads_large_values() {
3103 // Test with very large max_threads values
3104 let prop = ObservableProperty::with_max_threads(0, 1000);
3105 let call_count = Arc::new(AtomicUsize::new(0));
3106
3107 // Add a few observers
3108 let count = call_count.clone();
3109 let _subscription = prop
3110 .subscribe_with_subscription(Arc::new(move |_, _| {
3111 count.fetch_add(1, Ordering::SeqCst);
3112 }))
3113 .expect("Failed to create subscription for large max_threads test");
3114
3115 // Should work normally even with excessive thread limit
3116 prop.set_async(42).expect("Failed to set value with large max_threads");
3117
3118 thread::sleep(Duration::from_millis(50));
3119 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3120 }
3121
3122 #[test]
3123 fn test_with_max_threads_clone_behavior() {
3124 let prop1 = ObservableProperty::with_max_threads(42, 2);
3125 let prop2 = prop1.clone();
3126
3127 let call_count1 = Arc::new(AtomicUsize::new(0));
3128 let call_count2 = Arc::new(AtomicUsize::new(0));
3129
3130 // Subscribe to both properties
3131 let count1 = call_count1.clone();
3132 let _sub1 = prop1
3133 .subscribe_with_subscription(Arc::new(move |_, _| {
3134 count1.fetch_add(1, Ordering::SeqCst);
3135 }))
3136 .expect("Failed to create subscription for cloned property test");
3137
3138 let count2 = call_count2.clone();
3139 let _sub2 = prop2
3140 .subscribe_with_subscription(Arc::new(move |_, _| {
3141 count2.fetch_add(1, Ordering::SeqCst);
3142 }))
3143 .expect("Failed to create subscription for original property test");
3144
3145 // Changes through either property should trigger both observers
3146 prop1.set_async(100).expect("Failed to set value through prop1");
3147 thread::sleep(Duration::from_millis(50));
3148
3149 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
3150 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
3151
3152 prop2.set_async(200).expect("Failed to set value through prop2");
3153 thread::sleep(Duration::from_millis(50));
3154
3155 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
3156 assert_eq!(call_count2.load(Ordering::SeqCst), 2);
3157 }
3158
3159 #[test]
3160 fn test_with_max_threads_thread_safety() {
3161 let prop = Arc::new(ObservableProperty::with_max_threads(0, 3));
3162 let call_count = Arc::new(AtomicUsize::new(0));
3163
3164 // Add observers from multiple threads
3165 let handles: Vec<_> = (0..5)
3166 .map(|thread_id| {
3167 let prop_clone = prop.clone();
3168 let count_clone = call_count.clone();
3169
3170 thread::spawn(move || {
3171 let count = count_clone.clone();
3172 let _subscription = prop_clone
3173 .subscribe_with_subscription(Arc::new(move |_, _| {
3174 count.fetch_add(1, Ordering::SeqCst);
3175 }))
3176 .expect("Failed to create thread-safe subscription");
3177
3178 // Trigger async notifications from this thread
3179 prop_clone
3180 .set_async(thread_id * 10)
3181 .expect("Failed to set value from thread");
3182
3183 thread::sleep(Duration::from_millis(10));
3184 })
3185 })
3186 .collect();
3187
3188 // Wait for all threads to complete
3189 for handle in handles {
3190 handle.join().expect("Thread should complete successfully");
3191 }
3192
3193 // Wait for all async operations to complete
3194 thread::sleep(Duration::from_millis(100));
3195
3196 // Each set_async should trigger all active observers at that time
3197 // The exact count depends on timing, but should be > 0
3198 assert!(call_count.load(Ordering::SeqCst) > 0);
3199 }
3200
3201 #[test]
3202 fn test_with_max_threads_error_handling() {
3203 let prop = ObservableProperty::with_max_threads(42, 2);
3204
3205 // Test that error handling works the same as regular properties
3206 let _subscription = prop
3207 .subscribe_with_subscription(Arc::new(|_, _| {
3208 // Normal observer
3209 }))
3210 .expect("Failed to create subscription for error handling test");
3211
3212 // Should handle errors gracefully
3213 assert!(prop.set(100).is_ok());
3214 assert!(prop.set_async(200).is_ok());
3215 assert_eq!(prop.get().expect("Failed to get value after error test"), 200);
3216 }
3217}